1/*-------------------------------------------------------------------------
2 *
3 * sinvaladt.c
4 * POSTGRES shared cache invalidation data manager.
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/storage/ipc/sinvaladt.c
12 *
13 *-------------------------------------------------------------------------
14 */
15#include "postgres.h"
16
17#include <signal.h>
18#include <unistd.h>
19
20#include "miscadmin.h"
21#include "storage/backendid.h"
22#include "storage/ipc.h"
23#include "storage/proc.h"
24#include "storage/procsignal.h"
25#include "storage/shmem.h"
26#include "storage/sinvaladt.h"
27#include "storage/spin.h"
28#include "access/transam.h"
29
30
31/*
32 * Conceptually, the shared cache invalidation messages are stored in an
33 * infinite array, where maxMsgNum is the next array subscript to store a
34 * submitted message in, minMsgNum is the smallest array subscript containing
35 * a message not yet read by all backends, and we always have maxMsgNum >=
36 * minMsgNum. (They are equal when there are no messages pending.) For each
37 * active backend, there is a nextMsgNum pointer indicating the next message it
38 * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
39 * backend.
40 *
41 * (In the current implementation, minMsgNum is a lower bound for the
42 * per-process nextMsgNum values, but it isn't rigorously kept equal to the
43 * smallest nextMsgNum --- it may lag behind. We only update it when
44 * SICleanupQueue is called, and we try not to do that often.)
45 *
46 * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
47 * entries. We translate MsgNum values into circular-buffer indexes by
48 * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
49 * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum
50 * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
51 * in the buffer. If the buffer does overflow, we recover by setting the
52 * "reset" flag for each backend that has fallen too far behind. A backend
53 * that is in "reset" state is ignored while determining minMsgNum. When
54 * it does finally attempt to receive inval messages, it must discard all
55 * its invalidatable state, since it won't know what it missed.
56 *
57 * To reduce the probability of needing resets, we send a "catchup" interrupt
58 * to any backend that seems to be falling unreasonably far behind. The
59 * normal behavior is that at most one such interrupt is in flight at a time;
60 * when a backend completes processing a catchup interrupt, it executes
61 * SICleanupQueue, which will signal the next-furthest-behind backend if
62 * needed. This avoids undue contention from multiple backends all trying
63 * to catch up at once. However, the furthest-back backend might be stuck
64 * in a state where it can't catch up. Eventually it will get reset, so it
65 * won't cause any more problems for anyone but itself. But we don't want
66 * to find that a bunch of other backends are now too close to the reset
67 * threshold to be saved. So SICleanupQueue is designed to occasionally
68 * send extra catchup interrupts as the queue gets fuller, to backends that
69 * are far behind and haven't gotten one yet. As long as there aren't a lot
70 * of "stuck" backends, we won't need a lot of extra interrupts, since ones
71 * that aren't stuck will propagate their interrupts to the next guy.
72 *
73 * We would have problems if the MsgNum values overflow an integer, so
74 * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
75 * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be
76 * large so that we don't need to do this often. It must be a multiple of
77 * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
78 * to be moved when we do it.
79 *
80 * Access to the shared sinval array is protected by two locks, SInvalReadLock
81 * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this
82 * authorizes them to modify their own ProcState but not to modify or even
83 * look at anyone else's. When we need to perform array-wide updates,
84 * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
85 * lock out all readers. Writers take SInvalWriteLock (always in exclusive
86 * mode) to serialize adding messages to the queue. Note that a writer
87 * can operate in parallel with one or more readers, because the writer
88 * has no need to touch anyone's ProcState, except in the infrequent cases
89 * when SICleanupQueue is needed. The only point of overlap is that
90 * the writer wants to change maxMsgNum while readers need to read it.
91 * We deal with that by having a spinlock that readers must take for just
92 * long enough to read maxMsgNum, while writers take it for just long enough
93 * to write maxMsgNum. (The exact rule is that you need the spinlock to
94 * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
95 * spinlock to write maxMsgNum unless you are holding both locks.)
96 *
97 * Note: since maxMsgNum is an int and hence presumably atomically readable/
98 * writable, the spinlock might seem unnecessary. The reason it is needed
99 * is to provide a memory barrier: we need to be sure that messages written
100 * to the array are actually there before maxMsgNum is increased, and that
101 * readers will see that data after fetching maxMsgNum. Multiprocessors
102 * that have weak memory-ordering guarantees can fail without the memory
103 * barrier instructions that are included in the spinlock sequences.
104 */
105
106
107/*
108 * Configurable parameters.
109 *
110 * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
111 * Must be a power of 2 for speed.
112 *
113 * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
114 * Must be a multiple of MAXNUMMESSAGES. Should be large.
115 *
116 * CLEANUP_MIN: the minimum number of messages that must be in the buffer
117 * before we bother to call SICleanupQueue.
118 *
119 * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
120 * we exceed CLEANUP_MIN. Should be a power of 2 for speed.
121 *
122 * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
123 * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
124 *
125 * WRITE_QUANTUM: the max number of messages to push into the buffer per
126 * iteration of SIInsertDataEntries. Noncritical but should be less than
127 * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
128 * per iteration.
129 */
130
131#define MAXNUMMESSAGES 4096
132#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
133#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
134#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
135#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
136#define WRITE_QUANTUM 64
137
138/* Per-backend state in shared invalidation structure */
139typedef struct ProcState
140{
141 /* procPid is zero in an inactive ProcState array entry. */
142 pid_t procPid; /* PID of backend, for signaling */
143 PGPROC *proc; /* PGPROC of backend */
144 /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
145 int nextMsgNum; /* next message number to read */
146 bool resetState; /* backend needs to reset its state */
147 bool signaled; /* backend has been sent catchup signal */
148 bool hasMessages; /* backend has unread messages */
149
150 /*
151 * Backend only sends invalidations, never receives them. This only makes
152 * sense for Startup process during recovery because it doesn't maintain a
153 * relcache, yet it fires inval messages to allow query backends to see
154 * schema changes.
155 */
156 bool sendOnly; /* backend only sends, never receives */
157
158 /*
159 * Next LocalTransactionId to use for each idle backend slot. We keep
160 * this here because it is indexed by BackendId and it is convenient to
161 * copy the value to and from local memory when MyBackendId is set. It's
162 * meaningless in an active ProcState entry.
163 */
164 LocalTransactionId nextLXID;
165} ProcState;
166
167/* Shared cache invalidation memory segment */
168typedef struct SISeg
169{
170 /*
171 * General state information
172 */
173 int minMsgNum; /* oldest message still needed */
174 int maxMsgNum; /* next message number to be assigned */
175 int nextThreshold; /* # of messages to call SICleanupQueue */
176 int lastBackend; /* index of last active procState entry, +1 */
177 int maxBackends; /* size of procState array */
178
179 slock_t msgnumLock; /* spinlock protecting maxMsgNum */
180
181 /*
182 * Circular buffer holding shared-inval messages
183 */
184 SharedInvalidationMessage buffer[MAXNUMMESSAGES];
185
186 /*
187 * Per-backend invalidation state info (has MaxBackends entries).
188 */
189 ProcState procState[FLEXIBLE_ARRAY_MEMBER];
190} SISeg;
191
192static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
193
194
195static LocalTransactionId nextLocalTransactionId;
196
197static void CleanupInvalidationState(int status, Datum arg);
198
199
200/*
201 * SInvalShmemSize --- return shared-memory space needed
202 */
203Size
204SInvalShmemSize(void)
205{
206 Size size;
207
208 size = offsetof(SISeg, procState);
209 size = add_size(size, mul_size(sizeof(ProcState), MaxBackends));
210
211 return size;
212}
213
214/*
215 * CreateSharedInvalidationState
216 * Create and initialize the SI message buffer
217 */
218void
219CreateSharedInvalidationState(void)
220{
221 int i;
222 bool found;
223
224 /* Allocate space in shared memory */
225 shmInvalBuffer = (SISeg *)
226 ShmemInitStruct("shmInvalBuffer", SInvalShmemSize(), &found);
227 if (found)
228 return;
229
230 /* Clear message counters, save size of procState array, init spinlock */
231 shmInvalBuffer->minMsgNum = 0;
232 shmInvalBuffer->maxMsgNum = 0;
233 shmInvalBuffer->nextThreshold = CLEANUP_MIN;
234 shmInvalBuffer->lastBackend = 0;
235 shmInvalBuffer->maxBackends = MaxBackends;
236 SpinLockInit(&shmInvalBuffer->msgnumLock);
237
238 /* The buffer[] array is initially all unused, so we need not fill it */
239
240 /* Mark all backends inactive, and initialize nextLXID */
241 for (i = 0; i < shmInvalBuffer->maxBackends; i++)
242 {
243 shmInvalBuffer->procState[i].procPid = 0; /* inactive */
244 shmInvalBuffer->procState[i].proc = NULL;
245 shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */
246 shmInvalBuffer->procState[i].resetState = false;
247 shmInvalBuffer->procState[i].signaled = false;
248 shmInvalBuffer->procState[i].hasMessages = false;
249 shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
250 }
251}
252
253/*
254 * SharedInvalBackendInit
255 * Initialize a new backend to operate on the sinval buffer
256 */
257void
258SharedInvalBackendInit(bool sendOnly)
259{
260 int index;
261 ProcState *stateP = NULL;
262 SISeg *segP = shmInvalBuffer;
263
264 /*
265 * This can run in parallel with read operations, but not with write
266 * operations, since SIInsertDataEntries relies on lastBackend to set
267 * hasMessages appropriately.
268 */
269 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
270
271 /* Look for a free entry in the procState array */
272 for (index = 0; index < segP->lastBackend; index++)
273 {
274 if (segP->procState[index].procPid == 0) /* inactive slot? */
275 {
276 stateP = &segP->procState[index];
277 break;
278 }
279 }
280
281 if (stateP == NULL)
282 {
283 if (segP->lastBackend < segP->maxBackends)
284 {
285 stateP = &segP->procState[segP->lastBackend];
286 Assert(stateP->procPid == 0);
287 segP->lastBackend++;
288 }
289 else
290 {
291 /*
292 * out of procState slots: MaxBackends exceeded -- report normally
293 */
294 MyBackendId = InvalidBackendId;
295 LWLockRelease(SInvalWriteLock);
296 ereport(FATAL,
297 (errcode(ERRCODE_TOO_MANY_CONNECTIONS),
298 errmsg("sorry, too many clients already")));
299 }
300 }
301
302 MyBackendId = (stateP - &segP->procState[0]) + 1;
303
304 /* Advertise assigned backend ID in MyProc */
305 MyProc->backendId = MyBackendId;
306
307 /* Fetch next local transaction ID into local memory */
308 nextLocalTransactionId = stateP->nextLXID;
309
310 /* mark myself active, with all extant messages already read */
311 stateP->procPid = MyProcPid;
312 stateP->proc = MyProc;
313 stateP->nextMsgNum = segP->maxMsgNum;
314 stateP->resetState = false;
315 stateP->signaled = false;
316 stateP->hasMessages = false;
317 stateP->sendOnly = sendOnly;
318
319 LWLockRelease(SInvalWriteLock);
320
321 /* register exit routine to mark my entry inactive at exit */
322 on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
323
324 elog(DEBUG4, "my backend ID is %d", MyBackendId);
325}
326
327/*
328 * CleanupInvalidationState
329 * Mark the current backend as no longer active.
330 *
331 * This function is called via on_shmem_exit() during backend shutdown.
332 *
333 * arg is really of type "SISeg*".
334 */
335static void
336CleanupInvalidationState(int status, Datum arg)
337{
338 SISeg *segP = (SISeg *) DatumGetPointer(arg);
339 ProcState *stateP;
340 int i;
341
342 Assert(PointerIsValid(segP));
343
344 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
345
346 stateP = &segP->procState[MyBackendId - 1];
347
348 /* Update next local transaction ID for next holder of this backendID */
349 stateP->nextLXID = nextLocalTransactionId;
350
351 /* Mark myself inactive */
352 stateP->procPid = 0;
353 stateP->proc = NULL;
354 stateP->nextMsgNum = 0;
355 stateP->resetState = false;
356 stateP->signaled = false;
357
358 /* Recompute index of last active backend */
359 for (i = segP->lastBackend; i > 0; i--)
360 {
361 if (segP->procState[i - 1].procPid != 0)
362 break;
363 }
364 segP->lastBackend = i;
365
366 LWLockRelease(SInvalWriteLock);
367}
368
369/*
370 * BackendIdGetProc
371 * Get the PGPROC structure for a backend, given the backend ID.
372 * The result may be out of date arbitrarily quickly, so the caller
373 * must be careful about how this information is used. NULL is
374 * returned if the backend is not active.
375 */
376PGPROC *
377BackendIdGetProc(int backendID)
378{
379 PGPROC *result = NULL;
380 SISeg *segP = shmInvalBuffer;
381
382 /* Need to lock out additions/removals of backends */
383 LWLockAcquire(SInvalWriteLock, LW_SHARED);
384
385 if (backendID > 0 && backendID <= segP->lastBackend)
386 {
387 ProcState *stateP = &segP->procState[backendID - 1];
388
389 result = stateP->proc;
390 }
391
392 LWLockRelease(SInvalWriteLock);
393
394 return result;
395}
396
397/*
398 * BackendIdGetTransactionIds
399 * Get the xid and xmin of the backend. The result may be out of date
400 * arbitrarily quickly, so the caller must be careful about how this
401 * information is used.
402 */
403void
404BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin)
405{
406 SISeg *segP = shmInvalBuffer;
407
408 *xid = InvalidTransactionId;
409 *xmin = InvalidTransactionId;
410
411 /* Need to lock out additions/removals of backends */
412 LWLockAcquire(SInvalWriteLock, LW_SHARED);
413
414 if (backendID > 0 && backendID <= segP->lastBackend)
415 {
416 ProcState *stateP = &segP->procState[backendID - 1];
417 PGPROC *proc = stateP->proc;
418
419 if (proc != NULL)
420 {
421 PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno];
422
423 *xid = xact->xid;
424 *xmin = xact->xmin;
425 }
426 }
427
428 LWLockRelease(SInvalWriteLock);
429}
430
431/*
432 * SIInsertDataEntries
433 * Add new invalidation message(s) to the buffer.
434 */
435void
436SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
437{
438 SISeg *segP = shmInvalBuffer;
439
440 /*
441 * N can be arbitrarily large. We divide the work into groups of no more
442 * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
443 * an unreasonably long time. (This is not so much because we care about
444 * letting in other writers, as that some just-caught-up backend might be
445 * trying to do SICleanupQueue to pass on its signal, and we don't want it
446 * to have to wait a long time.) Also, we need to consider calling
447 * SICleanupQueue every so often.
448 */
449 while (n > 0)
450 {
451 int nthistime = Min(n, WRITE_QUANTUM);
452 int numMsgs;
453 int max;
454 int i;
455
456 n -= nthistime;
457
458 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
459
460 /*
461 * If the buffer is full, we *must* acquire some space. Clean the
462 * queue and reset anyone who is preventing space from being freed.
463 * Otherwise, clean the queue only when it's exceeded the next
464 * fullness threshold. We have to loop and recheck the buffer state
465 * after any call of SICleanupQueue.
466 */
467 for (;;)
468 {
469 numMsgs = segP->maxMsgNum - segP->minMsgNum;
470 if (numMsgs + nthistime > MAXNUMMESSAGES ||
471 numMsgs >= segP->nextThreshold)
472 SICleanupQueue(true, nthistime);
473 else
474 break;
475 }
476
477 /*
478 * Insert new message(s) into proper slot of circular buffer
479 */
480 max = segP->maxMsgNum;
481 while (nthistime-- > 0)
482 {
483 segP->buffer[max % MAXNUMMESSAGES] = *data++;
484 max++;
485 }
486
487 /* Update current value of maxMsgNum using spinlock */
488 SpinLockAcquire(&segP->msgnumLock);
489 segP->maxMsgNum = max;
490 SpinLockRelease(&segP->msgnumLock);
491
492 /*
493 * Now that the maxMsgNum change is globally visible, we give everyone
494 * a swift kick to make sure they read the newly added messages.
495 * Releasing SInvalWriteLock will enforce a full memory barrier, so
496 * these (unlocked) changes will be committed to memory before we exit
497 * the function.
498 */
499 for (i = 0; i < segP->lastBackend; i++)
500 {
501 ProcState *stateP = &segP->procState[i];
502
503 stateP->hasMessages = true;
504 }
505
506 LWLockRelease(SInvalWriteLock);
507 }
508}
509
510/*
511 * SIGetDataEntries
512 * get next SI message(s) for current backend, if there are any
513 *
514 * Possible return values:
515 * 0: no SI message available
516 * n>0: next n SI messages have been extracted into data[]
517 * -1: SI reset message extracted
518 *
519 * If the return value is less than the array size "datasize", the caller
520 * can assume that there are no more SI messages after the one(s) returned.
521 * Otherwise, another call is needed to collect more messages.
522 *
523 * NB: this can run in parallel with other instances of SIGetDataEntries
524 * executing on behalf of other backends, since each instance will modify only
525 * fields of its own backend's ProcState, and no instance will look at fields
526 * of other backends' ProcStates. We express this by grabbing SInvalReadLock
527 * in shared mode. Note that this is not exactly the normal (read-only)
528 * interpretation of a shared lock! Look closely at the interactions before
529 * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
530 *
531 * NB: this can also run in parallel with SIInsertDataEntries. It is not
532 * guaranteed that we will return any messages added after the routine is
533 * entered.
534 *
535 * Note: we assume that "datasize" is not so large that it might be important
536 * to break our hold on SInvalReadLock into segments.
537 */
538int
539SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
540{
541 SISeg *segP;
542 ProcState *stateP;
543 int max;
544 int n;
545
546 segP = shmInvalBuffer;
547 stateP = &segP->procState[MyBackendId - 1];
548
549 /*
550 * Before starting to take locks, do a quick, unlocked test to see whether
551 * there can possibly be anything to read. On a multiprocessor system,
552 * it's possible that this load could migrate backwards and occur before
553 * we actually enter this function, so we might miss a sinval message that
554 * was just added by some other processor. But they can't migrate
555 * backwards over a preceding lock acquisition, so it should be OK. If we
556 * haven't acquired a lock preventing against further relevant
557 * invalidations, any such occurrence is not much different than if the
558 * invalidation had arrived slightly later in the first place.
559 */
560 if (!stateP->hasMessages)
561 return 0;
562
563 LWLockAcquire(SInvalReadLock, LW_SHARED);
564
565 /*
566 * We must reset hasMessages before determining how many messages we're
567 * going to read. That way, if new messages arrive after we have
568 * determined how many we're reading, the flag will get reset and we'll
569 * notice those messages part-way through.
570 *
571 * Note that, if we don't end up reading all of the messages, we had
572 * better be certain to reset this flag before exiting!
573 */
574 stateP->hasMessages = false;
575
576 /* Fetch current value of maxMsgNum using spinlock */
577 SpinLockAcquire(&segP->msgnumLock);
578 max = segP->maxMsgNum;
579 SpinLockRelease(&segP->msgnumLock);
580
581 if (stateP->resetState)
582 {
583 /*
584 * Force reset. We can say we have dealt with any messages added
585 * since the reset, as well; and that means we should clear the
586 * signaled flag, too.
587 */
588 stateP->nextMsgNum = max;
589 stateP->resetState = false;
590 stateP->signaled = false;
591 LWLockRelease(SInvalReadLock);
592 return -1;
593 }
594
595 /*
596 * Retrieve messages and advance backend's counter, until data array is
597 * full or there are no more messages.
598 *
599 * There may be other backends that haven't read the message(s), so we
600 * cannot delete them here. SICleanupQueue() will eventually remove them
601 * from the queue.
602 */
603 n = 0;
604 while (n < datasize && stateP->nextMsgNum < max)
605 {
606 data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
607 stateP->nextMsgNum++;
608 }
609
610 /*
611 * If we have caught up completely, reset our "signaled" flag so that
612 * we'll get another signal if we fall behind again.
613 *
614 * If we haven't caught up completely, reset the hasMessages flag so that
615 * we see the remaining messages next time.
616 */
617 if (stateP->nextMsgNum >= max)
618 stateP->signaled = false;
619 else
620 stateP->hasMessages = true;
621
622 LWLockRelease(SInvalReadLock);
623 return n;
624}
625
626/*
627 * SICleanupQueue
628 * Remove messages that have been consumed by all active backends
629 *
630 * callerHasWriteLock is true if caller is holding SInvalWriteLock.
631 * minFree is the minimum number of message slots to make free.
632 *
633 * Possible side effects of this routine include marking one or more
634 * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
635 * to some backend that seems to be getting too far behind. We signal at
636 * most one backend at a time, for reasons explained at the top of the file.
637 *
638 * Caution: because we transiently release write lock when we have to signal
639 * some other backend, it is NOT guaranteed that there are still minFree
640 * free message slots at exit. Caller must recheck and perhaps retry.
641 */
642void
643SICleanupQueue(bool callerHasWriteLock, int minFree)
644{
645 SISeg *segP = shmInvalBuffer;
646 int min,
647 minsig,
648 lowbound,
649 numMsgs,
650 i;
651 ProcState *needSig = NULL;
652
653 /* Lock out all writers and readers */
654 if (!callerHasWriteLock)
655 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
656 LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
657
658 /*
659 * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
660 * furthest-back backend that needs signaling (if any), and reset any
661 * backends that are too far back. Note that because we ignore sendOnly
662 * backends here it is possible for them to keep sending messages without
663 * a problem even when they are the only active backend.
664 */
665 min = segP->maxMsgNum;
666 minsig = min - SIG_THRESHOLD;
667 lowbound = min - MAXNUMMESSAGES + minFree;
668
669 for (i = 0; i < segP->lastBackend; i++)
670 {
671 ProcState *stateP = &segP->procState[i];
672 int n = stateP->nextMsgNum;
673
674 /* Ignore if inactive or already in reset state */
675 if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly)
676 continue;
677
678 /*
679 * If we must free some space and this backend is preventing it, force
680 * him into reset state and then ignore until he catches up.
681 */
682 if (n < lowbound)
683 {
684 stateP->resetState = true;
685 /* no point in signaling him ... */
686 continue;
687 }
688
689 /* Track the global minimum nextMsgNum */
690 if (n < min)
691 min = n;
692
693 /* Also see who's furthest back of the unsignaled backends */
694 if (n < minsig && !stateP->signaled)
695 {
696 minsig = n;
697 needSig = stateP;
698 }
699 }
700 segP->minMsgNum = min;
701
702 /*
703 * When minMsgNum gets really large, decrement all message counters so as
704 * to forestall overflow of the counters. This happens seldom enough that
705 * folding it into the previous loop would be a loser.
706 */
707 if (min >= MSGNUMWRAPAROUND)
708 {
709 segP->minMsgNum -= MSGNUMWRAPAROUND;
710 segP->maxMsgNum -= MSGNUMWRAPAROUND;
711 for (i = 0; i < segP->lastBackend; i++)
712 {
713 /* we don't bother skipping inactive entries here */
714 segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND;
715 }
716 }
717
718 /*
719 * Determine how many messages are still in the queue, and set the
720 * threshold at which we should repeat SICleanupQueue().
721 */
722 numMsgs = segP->maxMsgNum - segP->minMsgNum;
723 if (numMsgs < CLEANUP_MIN)
724 segP->nextThreshold = CLEANUP_MIN;
725 else
726 segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
727
728 /*
729 * Lastly, signal anyone who needs a catchup interrupt. Since
730 * SendProcSignal() might not be fast, we don't want to hold locks while
731 * executing it.
732 */
733 if (needSig)
734 {
735 pid_t his_pid = needSig->procPid;
736 BackendId his_backendId = (needSig - &segP->procState[0]) + 1;
737
738 needSig->signaled = true;
739 LWLockRelease(SInvalReadLock);
740 LWLockRelease(SInvalWriteLock);
741 elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
742 SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId);
743 if (callerHasWriteLock)
744 LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
745 }
746 else
747 {
748 LWLockRelease(SInvalReadLock);
749 if (!callerHasWriteLock)
750 LWLockRelease(SInvalWriteLock);
751 }
752}
753
754
755/*
756 * GetNextLocalTransactionId --- allocate a new LocalTransactionId
757 *
758 * We split VirtualTransactionIds into two parts so that it is possible
759 * to allocate a new one without any contention for shared memory, except
760 * for a bit of additional overhead during backend startup/shutdown.
761 * The high-order part of a VirtualTransactionId is a BackendId, and the
762 * low-order part is a LocalTransactionId, which we assign from a local
763 * counter. To avoid the risk of a VirtualTransactionId being reused
764 * within a short interval, successive procs occupying the same backend ID
765 * slot should use a consecutive sequence of local IDs, which is implemented
766 * by copying nextLocalTransactionId as seen above.
767 */
768LocalTransactionId
769GetNextLocalTransactionId(void)
770{
771 LocalTransactionId result;
772
773 /* loop to avoid returning InvalidLocalTransactionId at wraparound */
774 do
775 {
776 result = nextLocalTransactionId++;
777 } while (!LocalTransactionIdIsValid(result));
778
779 return result;
780}
781