1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * sinvaladt.c |
4 | * POSTGRES shared cache invalidation data manager. |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/storage/ipc/sinvaladt.c |
12 | * |
13 | *------------------------------------------------------------------------- |
14 | */ |
15 | #include "postgres.h" |
16 | |
17 | #include <signal.h> |
18 | #include <unistd.h> |
19 | |
20 | #include "miscadmin.h" |
21 | #include "storage/backendid.h" |
22 | #include "storage/ipc.h" |
23 | #include "storage/proc.h" |
24 | #include "storage/procsignal.h" |
25 | #include "storage/shmem.h" |
26 | #include "storage/sinvaladt.h" |
27 | #include "storage/spin.h" |
28 | #include "access/transam.h" |
29 | |
30 | |
31 | /* |
32 | * Conceptually, the shared cache invalidation messages are stored in an |
33 | * infinite array, where maxMsgNum is the next array subscript to store a |
34 | * submitted message in, minMsgNum is the smallest array subscript containing |
35 | * a message not yet read by all backends, and we always have maxMsgNum >= |
36 | * minMsgNum. (They are equal when there are no messages pending.) For each |
37 | * active backend, there is a nextMsgNum pointer indicating the next message it |
38 | * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every |
39 | * backend. |
40 | * |
41 | * (In the current implementation, minMsgNum is a lower bound for the |
42 | * per-process nextMsgNum values, but it isn't rigorously kept equal to the |
43 | * smallest nextMsgNum --- it may lag behind. We only update it when |
44 | * SICleanupQueue is called, and we try not to do that often.) |
45 | * |
46 | * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES |
47 | * entries. We translate MsgNum values into circular-buffer indexes by |
48 | * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as |
49 | * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum |
50 | * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space |
51 | * in the buffer. If the buffer does overflow, we recover by setting the |
52 | * "reset" flag for each backend that has fallen too far behind. A backend |
53 | * that is in "reset" state is ignored while determining minMsgNum. When |
54 | * it does finally attempt to receive inval messages, it must discard all |
55 | * its invalidatable state, since it won't know what it missed. |
56 | * |
57 | * To reduce the probability of needing resets, we send a "catchup" interrupt |
58 | * to any backend that seems to be falling unreasonably far behind. The |
59 | * normal behavior is that at most one such interrupt is in flight at a time; |
60 | * when a backend completes processing a catchup interrupt, it executes |
61 | * SICleanupQueue, which will signal the next-furthest-behind backend if |
62 | * needed. This avoids undue contention from multiple backends all trying |
63 | * to catch up at once. However, the furthest-back backend might be stuck |
64 | * in a state where it can't catch up. Eventually it will get reset, so it |
65 | * won't cause any more problems for anyone but itself. But we don't want |
66 | * to find that a bunch of other backends are now too close to the reset |
67 | * threshold to be saved. So SICleanupQueue is designed to occasionally |
68 | * send extra catchup interrupts as the queue gets fuller, to backends that |
69 | * are far behind and haven't gotten one yet. As long as there aren't a lot |
70 | * of "stuck" backends, we won't need a lot of extra interrupts, since ones |
71 | * that aren't stuck will propagate their interrupts to the next guy. |
72 | * |
73 | * We would have problems if the MsgNum values overflow an integer, so |
74 | * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND |
75 | * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be |
76 | * large so that we don't need to do this often. It must be a multiple of |
77 | * MAXNUMMESSAGES so that the existing circular-buffer entries don't need |
78 | * to be moved when we do it. |
79 | * |
80 | * Access to the shared sinval array is protected by two locks, SInvalReadLock |
81 | * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this |
82 | * authorizes them to modify their own ProcState but not to modify or even |
83 | * look at anyone else's. When we need to perform array-wide updates, |
84 | * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to |
85 | * lock out all readers. Writers take SInvalWriteLock (always in exclusive |
86 | * mode) to serialize adding messages to the queue. Note that a writer |
87 | * can operate in parallel with one or more readers, because the writer |
88 | * has no need to touch anyone's ProcState, except in the infrequent cases |
89 | * when SICleanupQueue is needed. The only point of overlap is that |
90 | * the writer wants to change maxMsgNum while readers need to read it. |
91 | * We deal with that by having a spinlock that readers must take for just |
92 | * long enough to read maxMsgNum, while writers take it for just long enough |
93 | * to write maxMsgNum. (The exact rule is that you need the spinlock to |
94 | * read maxMsgNum if you are not holding SInvalWriteLock, and you need the |
95 | * spinlock to write maxMsgNum unless you are holding both locks.) |
96 | * |
97 | * Note: since maxMsgNum is an int and hence presumably atomically readable/ |
98 | * writable, the spinlock might seem unnecessary. The reason it is needed |
99 | * is to provide a memory barrier: we need to be sure that messages written |
100 | * to the array are actually there before maxMsgNum is increased, and that |
101 | * readers will see that data after fetching maxMsgNum. Multiprocessors |
102 | * that have weak memory-ordering guarantees can fail without the memory |
103 | * barrier instructions that are included in the spinlock sequences. |
104 | */ |
105 | |
106 | |
107 | /* |
108 | * Configurable parameters. |
109 | * |
110 | * MAXNUMMESSAGES: max number of shared-inval messages we can buffer. |
111 | * Must be a power of 2 for speed. |
112 | * |
113 | * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow. |
114 | * Must be a multiple of MAXNUMMESSAGES. Should be large. |
115 | * |
116 | * CLEANUP_MIN: the minimum number of messages that must be in the buffer |
117 | * before we bother to call SICleanupQueue. |
118 | * |
119 | * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once |
120 | * we exceed CLEANUP_MIN. Should be a power of 2 for speed. |
121 | * |
122 | * SIG_THRESHOLD: the minimum number of messages a backend must have fallen |
123 | * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT. |
124 | * |
125 | * WRITE_QUANTUM: the max number of messages to push into the buffer per |
126 | * iteration of SIInsertDataEntries. Noncritical but should be less than |
127 | * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once |
128 | * per iteration. |
129 | */ |
130 | |
131 | #define MAXNUMMESSAGES 4096 |
132 | #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) |
133 | #define CLEANUP_MIN (MAXNUMMESSAGES / 2) |
134 | #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) |
135 | #define SIG_THRESHOLD (MAXNUMMESSAGES / 2) |
136 | #define WRITE_QUANTUM 64 |
137 | |
138 | /* Per-backend state in shared invalidation structure */ |
139 | typedef struct ProcState |
140 | { |
141 | /* procPid is zero in an inactive ProcState array entry. */ |
142 | pid_t procPid; /* PID of backend, for signaling */ |
143 | PGPROC *proc; /* PGPROC of backend */ |
144 | /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ |
145 | int nextMsgNum; /* next message number to read */ |
146 | bool resetState; /* backend needs to reset its state */ |
147 | bool signaled; /* backend has been sent catchup signal */ |
148 | bool hasMessages; /* backend has unread messages */ |
149 | |
150 | /* |
151 | * Backend only sends invalidations, never receives them. This only makes |
152 | * sense for Startup process during recovery because it doesn't maintain a |
153 | * relcache, yet it fires inval messages to allow query backends to see |
154 | * schema changes. |
155 | */ |
156 | bool sendOnly; /* backend only sends, never receives */ |
157 | |
158 | /* |
159 | * Next LocalTransactionId to use for each idle backend slot. We keep |
160 | * this here because it is indexed by BackendId and it is convenient to |
161 | * copy the value to and from local memory when MyBackendId is set. It's |
162 | * meaningless in an active ProcState entry. |
163 | */ |
164 | LocalTransactionId nextLXID; |
165 | } ProcState; |
166 | |
167 | /* Shared cache invalidation memory segment */ |
168 | typedef struct SISeg |
169 | { |
170 | /* |
171 | * General state information |
172 | */ |
173 | int minMsgNum; /* oldest message still needed */ |
174 | int maxMsgNum; /* next message number to be assigned */ |
175 | int nextThreshold; /* # of messages to call SICleanupQueue */ |
176 | int lastBackend; /* index of last active procState entry, +1 */ |
177 | int maxBackends; /* size of procState array */ |
178 | |
179 | slock_t msgnumLock; /* spinlock protecting maxMsgNum */ |
180 | |
181 | /* |
182 | * Circular buffer holding shared-inval messages |
183 | */ |
184 | SharedInvalidationMessage buffer[MAXNUMMESSAGES]; |
185 | |
186 | /* |
187 | * Per-backend invalidation state info (has MaxBackends entries). |
188 | */ |
189 | ProcState procState[FLEXIBLE_ARRAY_MEMBER]; |
190 | } SISeg; |
191 | |
192 | static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */ |
193 | |
194 | |
195 | static LocalTransactionId nextLocalTransactionId; |
196 | |
197 | static void CleanupInvalidationState(int status, Datum arg); |
198 | |
199 | |
200 | /* |
201 | * SInvalShmemSize --- return shared-memory space needed |
202 | */ |
203 | Size |
204 | SInvalShmemSize(void) |
205 | { |
206 | Size size; |
207 | |
208 | size = offsetof(SISeg, procState); |
209 | size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); |
210 | |
211 | return size; |
212 | } |
213 | |
214 | /* |
215 | * CreateSharedInvalidationState |
216 | * Create and initialize the SI message buffer |
217 | */ |
218 | void |
219 | CreateSharedInvalidationState(void) |
220 | { |
221 | int i; |
222 | bool found; |
223 | |
224 | /* Allocate space in shared memory */ |
225 | shmInvalBuffer = (SISeg *) |
226 | ShmemInitStruct("shmInvalBuffer" , SInvalShmemSize(), &found); |
227 | if (found) |
228 | return; |
229 | |
230 | /* Clear message counters, save size of procState array, init spinlock */ |
231 | shmInvalBuffer->minMsgNum = 0; |
232 | shmInvalBuffer->maxMsgNum = 0; |
233 | shmInvalBuffer->nextThreshold = CLEANUP_MIN; |
234 | shmInvalBuffer->lastBackend = 0; |
235 | shmInvalBuffer->maxBackends = MaxBackends; |
236 | SpinLockInit(&shmInvalBuffer->msgnumLock); |
237 | |
238 | /* The buffer[] array is initially all unused, so we need not fill it */ |
239 | |
240 | /* Mark all backends inactive, and initialize nextLXID */ |
241 | for (i = 0; i < shmInvalBuffer->maxBackends; i++) |
242 | { |
243 | shmInvalBuffer->procState[i].procPid = 0; /* inactive */ |
244 | shmInvalBuffer->procState[i].proc = NULL; |
245 | shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ |
246 | shmInvalBuffer->procState[i].resetState = false; |
247 | shmInvalBuffer->procState[i].signaled = false; |
248 | shmInvalBuffer->procState[i].hasMessages = false; |
249 | shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; |
250 | } |
251 | } |
252 | |
253 | /* |
254 | * SharedInvalBackendInit |
255 | * Initialize a new backend to operate on the sinval buffer |
256 | */ |
257 | void |
258 | SharedInvalBackendInit(bool sendOnly) |
259 | { |
260 | int index; |
261 | ProcState *stateP = NULL; |
262 | SISeg *segP = shmInvalBuffer; |
263 | |
264 | /* |
265 | * This can run in parallel with read operations, but not with write |
266 | * operations, since SIInsertDataEntries relies on lastBackend to set |
267 | * hasMessages appropriately. |
268 | */ |
269 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
270 | |
271 | /* Look for a free entry in the procState array */ |
272 | for (index = 0; index < segP->lastBackend; index++) |
273 | { |
274 | if (segP->procState[index].procPid == 0) /* inactive slot? */ |
275 | { |
276 | stateP = &segP->procState[index]; |
277 | break; |
278 | } |
279 | } |
280 | |
281 | if (stateP == NULL) |
282 | { |
283 | if (segP->lastBackend < segP->maxBackends) |
284 | { |
285 | stateP = &segP->procState[segP->lastBackend]; |
286 | Assert(stateP->procPid == 0); |
287 | segP->lastBackend++; |
288 | } |
289 | else |
290 | { |
291 | /* |
292 | * out of procState slots: MaxBackends exceeded -- report normally |
293 | */ |
294 | MyBackendId = InvalidBackendId; |
295 | LWLockRelease(SInvalWriteLock); |
296 | ereport(FATAL, |
297 | (errcode(ERRCODE_TOO_MANY_CONNECTIONS), |
298 | errmsg("sorry, too many clients already" ))); |
299 | } |
300 | } |
301 | |
302 | MyBackendId = (stateP - &segP->procState[0]) + 1; |
303 | |
304 | /* Advertise assigned backend ID in MyProc */ |
305 | MyProc->backendId = MyBackendId; |
306 | |
307 | /* Fetch next local transaction ID into local memory */ |
308 | nextLocalTransactionId = stateP->nextLXID; |
309 | |
310 | /* mark myself active, with all extant messages already read */ |
311 | stateP->procPid = MyProcPid; |
312 | stateP->proc = MyProc; |
313 | stateP->nextMsgNum = segP->maxMsgNum; |
314 | stateP->resetState = false; |
315 | stateP->signaled = false; |
316 | stateP->hasMessages = false; |
317 | stateP->sendOnly = sendOnly; |
318 | |
319 | LWLockRelease(SInvalWriteLock); |
320 | |
321 | /* register exit routine to mark my entry inactive at exit */ |
322 | on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); |
323 | |
324 | elog(DEBUG4, "my backend ID is %d" , MyBackendId); |
325 | } |
326 | |
327 | /* |
328 | * CleanupInvalidationState |
329 | * Mark the current backend as no longer active. |
330 | * |
331 | * This function is called via on_shmem_exit() during backend shutdown. |
332 | * |
333 | * arg is really of type "SISeg*". |
334 | */ |
335 | static void |
336 | CleanupInvalidationState(int status, Datum arg) |
337 | { |
338 | SISeg *segP = (SISeg *) DatumGetPointer(arg); |
339 | ProcState *stateP; |
340 | int i; |
341 | |
342 | Assert(PointerIsValid(segP)); |
343 | |
344 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
345 | |
346 | stateP = &segP->procState[MyBackendId - 1]; |
347 | |
348 | /* Update next local transaction ID for next holder of this backendID */ |
349 | stateP->nextLXID = nextLocalTransactionId; |
350 | |
351 | /* Mark myself inactive */ |
352 | stateP->procPid = 0; |
353 | stateP->proc = NULL; |
354 | stateP->nextMsgNum = 0; |
355 | stateP->resetState = false; |
356 | stateP->signaled = false; |
357 | |
358 | /* Recompute index of last active backend */ |
359 | for (i = segP->lastBackend; i > 0; i--) |
360 | { |
361 | if (segP->procState[i - 1].procPid != 0) |
362 | break; |
363 | } |
364 | segP->lastBackend = i; |
365 | |
366 | LWLockRelease(SInvalWriteLock); |
367 | } |
368 | |
369 | /* |
370 | * BackendIdGetProc |
371 | * Get the PGPROC structure for a backend, given the backend ID. |
372 | * The result may be out of date arbitrarily quickly, so the caller |
373 | * must be careful about how this information is used. NULL is |
374 | * returned if the backend is not active. |
375 | */ |
376 | PGPROC * |
377 | BackendIdGetProc(int backendID) |
378 | { |
379 | PGPROC *result = NULL; |
380 | SISeg *segP = shmInvalBuffer; |
381 | |
382 | /* Need to lock out additions/removals of backends */ |
383 | LWLockAcquire(SInvalWriteLock, LW_SHARED); |
384 | |
385 | if (backendID > 0 && backendID <= segP->lastBackend) |
386 | { |
387 | ProcState *stateP = &segP->procState[backendID - 1]; |
388 | |
389 | result = stateP->proc; |
390 | } |
391 | |
392 | LWLockRelease(SInvalWriteLock); |
393 | |
394 | return result; |
395 | } |
396 | |
397 | /* |
398 | * BackendIdGetTransactionIds |
399 | * Get the xid and xmin of the backend. The result may be out of date |
400 | * arbitrarily quickly, so the caller must be careful about how this |
401 | * information is used. |
402 | */ |
403 | void |
404 | BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin) |
405 | { |
406 | SISeg *segP = shmInvalBuffer; |
407 | |
408 | *xid = InvalidTransactionId; |
409 | *xmin = InvalidTransactionId; |
410 | |
411 | /* Need to lock out additions/removals of backends */ |
412 | LWLockAcquire(SInvalWriteLock, LW_SHARED); |
413 | |
414 | if (backendID > 0 && backendID <= segP->lastBackend) |
415 | { |
416 | ProcState *stateP = &segP->procState[backendID - 1]; |
417 | PGPROC *proc = stateP->proc; |
418 | |
419 | if (proc != NULL) |
420 | { |
421 | PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno]; |
422 | |
423 | *xid = xact->xid; |
424 | *xmin = xact->xmin; |
425 | } |
426 | } |
427 | |
428 | LWLockRelease(SInvalWriteLock); |
429 | } |
430 | |
431 | /* |
432 | * SIInsertDataEntries |
433 | * Add new invalidation message(s) to the buffer. |
434 | */ |
435 | void |
436 | SIInsertDataEntries(const SharedInvalidationMessage *data, int n) |
437 | { |
438 | SISeg *segP = shmInvalBuffer; |
439 | |
440 | /* |
441 | * N can be arbitrarily large. We divide the work into groups of no more |
442 | * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for |
443 | * an unreasonably long time. (This is not so much because we care about |
444 | * letting in other writers, as that some just-caught-up backend might be |
445 | * trying to do SICleanupQueue to pass on its signal, and we don't want it |
446 | * to have to wait a long time.) Also, we need to consider calling |
447 | * SICleanupQueue every so often. |
448 | */ |
449 | while (n > 0) |
450 | { |
451 | int nthistime = Min(n, WRITE_QUANTUM); |
452 | int numMsgs; |
453 | int max; |
454 | int i; |
455 | |
456 | n -= nthistime; |
457 | |
458 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
459 | |
460 | /* |
461 | * If the buffer is full, we *must* acquire some space. Clean the |
462 | * queue and reset anyone who is preventing space from being freed. |
463 | * Otherwise, clean the queue only when it's exceeded the next |
464 | * fullness threshold. We have to loop and recheck the buffer state |
465 | * after any call of SICleanupQueue. |
466 | */ |
467 | for (;;) |
468 | { |
469 | numMsgs = segP->maxMsgNum - segP->minMsgNum; |
470 | if (numMsgs + nthistime > MAXNUMMESSAGES || |
471 | numMsgs >= segP->nextThreshold) |
472 | SICleanupQueue(true, nthistime); |
473 | else |
474 | break; |
475 | } |
476 | |
477 | /* |
478 | * Insert new message(s) into proper slot of circular buffer |
479 | */ |
480 | max = segP->maxMsgNum; |
481 | while (nthistime-- > 0) |
482 | { |
483 | segP->buffer[max % MAXNUMMESSAGES] = *data++; |
484 | max++; |
485 | } |
486 | |
487 | /* Update current value of maxMsgNum using spinlock */ |
488 | SpinLockAcquire(&segP->msgnumLock); |
489 | segP->maxMsgNum = max; |
490 | SpinLockRelease(&segP->msgnumLock); |
491 | |
492 | /* |
493 | * Now that the maxMsgNum change is globally visible, we give everyone |
494 | * a swift kick to make sure they read the newly added messages. |
495 | * Releasing SInvalWriteLock will enforce a full memory barrier, so |
496 | * these (unlocked) changes will be committed to memory before we exit |
497 | * the function. |
498 | */ |
499 | for (i = 0; i < segP->lastBackend; i++) |
500 | { |
501 | ProcState *stateP = &segP->procState[i]; |
502 | |
503 | stateP->hasMessages = true; |
504 | } |
505 | |
506 | LWLockRelease(SInvalWriteLock); |
507 | } |
508 | } |
509 | |
510 | /* |
511 | * SIGetDataEntries |
512 | * get next SI message(s) for current backend, if there are any |
513 | * |
514 | * Possible return values: |
515 | * 0: no SI message available |
516 | * n>0: next n SI messages have been extracted into data[] |
517 | * -1: SI reset message extracted |
518 | * |
519 | * If the return value is less than the array size "datasize", the caller |
520 | * can assume that there are no more SI messages after the one(s) returned. |
521 | * Otherwise, another call is needed to collect more messages. |
522 | * |
523 | * NB: this can run in parallel with other instances of SIGetDataEntries |
524 | * executing on behalf of other backends, since each instance will modify only |
525 | * fields of its own backend's ProcState, and no instance will look at fields |
526 | * of other backends' ProcStates. We express this by grabbing SInvalReadLock |
527 | * in shared mode. Note that this is not exactly the normal (read-only) |
528 | * interpretation of a shared lock! Look closely at the interactions before |
529 | * allowing SInvalReadLock to be grabbed in shared mode for any other reason! |
530 | * |
531 | * NB: this can also run in parallel with SIInsertDataEntries. It is not |
532 | * guaranteed that we will return any messages added after the routine is |
533 | * entered. |
534 | * |
535 | * Note: we assume that "datasize" is not so large that it might be important |
536 | * to break our hold on SInvalReadLock into segments. |
537 | */ |
538 | int |
539 | SIGetDataEntries(SharedInvalidationMessage *data, int datasize) |
540 | { |
541 | SISeg *segP; |
542 | ProcState *stateP; |
543 | int max; |
544 | int n; |
545 | |
546 | segP = shmInvalBuffer; |
547 | stateP = &segP->procState[MyBackendId - 1]; |
548 | |
549 | /* |
550 | * Before starting to take locks, do a quick, unlocked test to see whether |
551 | * there can possibly be anything to read. On a multiprocessor system, |
552 | * it's possible that this load could migrate backwards and occur before |
553 | * we actually enter this function, so we might miss a sinval message that |
554 | * was just added by some other processor. But they can't migrate |
555 | * backwards over a preceding lock acquisition, so it should be OK. If we |
556 | * haven't acquired a lock preventing against further relevant |
557 | * invalidations, any such occurrence is not much different than if the |
558 | * invalidation had arrived slightly later in the first place. |
559 | */ |
560 | if (!stateP->hasMessages) |
561 | return 0; |
562 | |
563 | LWLockAcquire(SInvalReadLock, LW_SHARED); |
564 | |
565 | /* |
566 | * We must reset hasMessages before determining how many messages we're |
567 | * going to read. That way, if new messages arrive after we have |
568 | * determined how many we're reading, the flag will get reset and we'll |
569 | * notice those messages part-way through. |
570 | * |
571 | * Note that, if we don't end up reading all of the messages, we had |
572 | * better be certain to reset this flag before exiting! |
573 | */ |
574 | stateP->hasMessages = false; |
575 | |
576 | /* Fetch current value of maxMsgNum using spinlock */ |
577 | SpinLockAcquire(&segP->msgnumLock); |
578 | max = segP->maxMsgNum; |
579 | SpinLockRelease(&segP->msgnumLock); |
580 | |
581 | if (stateP->resetState) |
582 | { |
583 | /* |
584 | * Force reset. We can say we have dealt with any messages added |
585 | * since the reset, as well; and that means we should clear the |
586 | * signaled flag, too. |
587 | */ |
588 | stateP->nextMsgNum = max; |
589 | stateP->resetState = false; |
590 | stateP->signaled = false; |
591 | LWLockRelease(SInvalReadLock); |
592 | return -1; |
593 | } |
594 | |
595 | /* |
596 | * Retrieve messages and advance backend's counter, until data array is |
597 | * full or there are no more messages. |
598 | * |
599 | * There may be other backends that haven't read the message(s), so we |
600 | * cannot delete them here. SICleanupQueue() will eventually remove them |
601 | * from the queue. |
602 | */ |
603 | n = 0; |
604 | while (n < datasize && stateP->nextMsgNum < max) |
605 | { |
606 | data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; |
607 | stateP->nextMsgNum++; |
608 | } |
609 | |
610 | /* |
611 | * If we have caught up completely, reset our "signaled" flag so that |
612 | * we'll get another signal if we fall behind again. |
613 | * |
614 | * If we haven't caught up completely, reset the hasMessages flag so that |
615 | * we see the remaining messages next time. |
616 | */ |
617 | if (stateP->nextMsgNum >= max) |
618 | stateP->signaled = false; |
619 | else |
620 | stateP->hasMessages = true; |
621 | |
622 | LWLockRelease(SInvalReadLock); |
623 | return n; |
624 | } |
625 | |
626 | /* |
627 | * SICleanupQueue |
628 | * Remove messages that have been consumed by all active backends |
629 | * |
630 | * callerHasWriteLock is true if caller is holding SInvalWriteLock. |
631 | * minFree is the minimum number of message slots to make free. |
632 | * |
633 | * Possible side effects of this routine include marking one or more |
634 | * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT |
635 | * to some backend that seems to be getting too far behind. We signal at |
636 | * most one backend at a time, for reasons explained at the top of the file. |
637 | * |
638 | * Caution: because we transiently release write lock when we have to signal |
639 | * some other backend, it is NOT guaranteed that there are still minFree |
640 | * free message slots at exit. Caller must recheck and perhaps retry. |
641 | */ |
642 | void |
643 | SICleanupQueue(bool callerHasWriteLock, int minFree) |
644 | { |
645 | SISeg *segP = shmInvalBuffer; |
646 | int min, |
647 | minsig, |
648 | lowbound, |
649 | numMsgs, |
650 | i; |
651 | ProcState *needSig = NULL; |
652 | |
653 | /* Lock out all writers and readers */ |
654 | if (!callerHasWriteLock) |
655 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
656 | LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); |
657 | |
658 | /* |
659 | * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the |
660 | * furthest-back backend that needs signaling (if any), and reset any |
661 | * backends that are too far back. Note that because we ignore sendOnly |
662 | * backends here it is possible for them to keep sending messages without |
663 | * a problem even when they are the only active backend. |
664 | */ |
665 | min = segP->maxMsgNum; |
666 | minsig = min - SIG_THRESHOLD; |
667 | lowbound = min - MAXNUMMESSAGES + minFree; |
668 | |
669 | for (i = 0; i < segP->lastBackend; i++) |
670 | { |
671 | ProcState *stateP = &segP->procState[i]; |
672 | int n = stateP->nextMsgNum; |
673 | |
674 | /* Ignore if inactive or already in reset state */ |
675 | if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) |
676 | continue; |
677 | |
678 | /* |
679 | * If we must free some space and this backend is preventing it, force |
680 | * him into reset state and then ignore until he catches up. |
681 | */ |
682 | if (n < lowbound) |
683 | { |
684 | stateP->resetState = true; |
685 | /* no point in signaling him ... */ |
686 | continue; |
687 | } |
688 | |
689 | /* Track the global minimum nextMsgNum */ |
690 | if (n < min) |
691 | min = n; |
692 | |
693 | /* Also see who's furthest back of the unsignaled backends */ |
694 | if (n < minsig && !stateP->signaled) |
695 | { |
696 | minsig = n; |
697 | needSig = stateP; |
698 | } |
699 | } |
700 | segP->minMsgNum = min; |
701 | |
702 | /* |
703 | * When minMsgNum gets really large, decrement all message counters so as |
704 | * to forestall overflow of the counters. This happens seldom enough that |
705 | * folding it into the previous loop would be a loser. |
706 | */ |
707 | if (min >= MSGNUMWRAPAROUND) |
708 | { |
709 | segP->minMsgNum -= MSGNUMWRAPAROUND; |
710 | segP->maxMsgNum -= MSGNUMWRAPAROUND; |
711 | for (i = 0; i < segP->lastBackend; i++) |
712 | { |
713 | /* we don't bother skipping inactive entries here */ |
714 | segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; |
715 | } |
716 | } |
717 | |
718 | /* |
719 | * Determine how many messages are still in the queue, and set the |
720 | * threshold at which we should repeat SICleanupQueue(). |
721 | */ |
722 | numMsgs = segP->maxMsgNum - segP->minMsgNum; |
723 | if (numMsgs < CLEANUP_MIN) |
724 | segP->nextThreshold = CLEANUP_MIN; |
725 | else |
726 | segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; |
727 | |
728 | /* |
729 | * Lastly, signal anyone who needs a catchup interrupt. Since |
730 | * SendProcSignal() might not be fast, we don't want to hold locks while |
731 | * executing it. |
732 | */ |
733 | if (needSig) |
734 | { |
735 | pid_t his_pid = needSig->procPid; |
736 | BackendId his_backendId = (needSig - &segP->procState[0]) + 1; |
737 | |
738 | needSig->signaled = true; |
739 | LWLockRelease(SInvalReadLock); |
740 | LWLockRelease(SInvalWriteLock); |
741 | elog(DEBUG4, "sending sinval catchup signal to PID %d" , (int) his_pid); |
742 | SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId); |
743 | if (callerHasWriteLock) |
744 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
745 | } |
746 | else |
747 | { |
748 | LWLockRelease(SInvalReadLock); |
749 | if (!callerHasWriteLock) |
750 | LWLockRelease(SInvalWriteLock); |
751 | } |
752 | } |
753 | |
754 | |
755 | /* |
756 | * GetNextLocalTransactionId --- allocate a new LocalTransactionId |
757 | * |
758 | * We split VirtualTransactionIds into two parts so that it is possible |
759 | * to allocate a new one without any contention for shared memory, except |
760 | * for a bit of additional overhead during backend startup/shutdown. |
761 | * The high-order part of a VirtualTransactionId is a BackendId, and the |
762 | * low-order part is a LocalTransactionId, which we assign from a local |
763 | * counter. To avoid the risk of a VirtualTransactionId being reused |
764 | * within a short interval, successive procs occupying the same backend ID |
765 | * slot should use a consecutive sequence of local IDs, which is implemented |
766 | * by copying nextLocalTransactionId as seen above. |
767 | */ |
768 | LocalTransactionId |
769 | GetNextLocalTransactionId(void) |
770 | { |
771 | LocalTransactionId result; |
772 | |
773 | /* loop to avoid returning InvalidLocalTransactionId at wraparound */ |
774 | do |
775 | { |
776 | result = nextLocalTransactionId++; |
777 | } while (!LocalTransactionIdIsValid(result)); |
778 | |
779 | return result; |
780 | } |
781 | |