| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * sinvaladt.c |
| 4 | * POSTGRES shared cache invalidation data manager. |
| 5 | * |
| 6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
| 7 | * Portions Copyright (c) 1994, Regents of the University of California |
| 8 | * |
| 9 | * |
| 10 | * IDENTIFICATION |
| 11 | * src/backend/storage/ipc/sinvaladt.c |
| 12 | * |
| 13 | *------------------------------------------------------------------------- |
| 14 | */ |
| 15 | #include "postgres.h" |
| 16 | |
| 17 | #include <signal.h> |
| 18 | #include <unistd.h> |
| 19 | |
| 20 | #include "miscadmin.h" |
| 21 | #include "storage/backendid.h" |
| 22 | #include "storage/ipc.h" |
| 23 | #include "storage/proc.h" |
| 24 | #include "storage/procsignal.h" |
| 25 | #include "storage/shmem.h" |
| 26 | #include "storage/sinvaladt.h" |
| 27 | #include "storage/spin.h" |
| 28 | #include "access/transam.h" |
| 29 | |
| 30 | |
| 31 | /* |
| 32 | * Conceptually, the shared cache invalidation messages are stored in an |
| 33 | * infinite array, where maxMsgNum is the next array subscript to store a |
| 34 | * submitted message in, minMsgNum is the smallest array subscript containing |
| 35 | * a message not yet read by all backends, and we always have maxMsgNum >= |
| 36 | * minMsgNum. (They are equal when there are no messages pending.) For each |
| 37 | * active backend, there is a nextMsgNum pointer indicating the next message it |
| 38 | * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every |
| 39 | * backend. |
| 40 | * |
| 41 | * (In the current implementation, minMsgNum is a lower bound for the |
| 42 | * per-process nextMsgNum values, but it isn't rigorously kept equal to the |
| 43 | * smallest nextMsgNum --- it may lag behind. We only update it when |
| 44 | * SICleanupQueue is called, and we try not to do that often.) |
| 45 | * |
| 46 | * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES |
| 47 | * entries. We translate MsgNum values into circular-buffer indexes by |
| 48 | * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as |
| 49 | * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum |
| 50 | * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space |
| 51 | * in the buffer. If the buffer does overflow, we recover by setting the |
| 52 | * "reset" flag for each backend that has fallen too far behind. A backend |
| 53 | * that is in "reset" state is ignored while determining minMsgNum. When |
| 54 | * it does finally attempt to receive inval messages, it must discard all |
| 55 | * its invalidatable state, since it won't know what it missed. |
| 56 | * |
| 57 | * To reduce the probability of needing resets, we send a "catchup" interrupt |
| 58 | * to any backend that seems to be falling unreasonably far behind. The |
| 59 | * normal behavior is that at most one such interrupt is in flight at a time; |
| 60 | * when a backend completes processing a catchup interrupt, it executes |
| 61 | * SICleanupQueue, which will signal the next-furthest-behind backend if |
| 62 | * needed. This avoids undue contention from multiple backends all trying |
| 63 | * to catch up at once. However, the furthest-back backend might be stuck |
| 64 | * in a state where it can't catch up. Eventually it will get reset, so it |
| 65 | * won't cause any more problems for anyone but itself. But we don't want |
| 66 | * to find that a bunch of other backends are now too close to the reset |
| 67 | * threshold to be saved. So SICleanupQueue is designed to occasionally |
| 68 | * send extra catchup interrupts as the queue gets fuller, to backends that |
| 69 | * are far behind and haven't gotten one yet. As long as there aren't a lot |
| 70 | * of "stuck" backends, we won't need a lot of extra interrupts, since ones |
| 71 | * that aren't stuck will propagate their interrupts to the next guy. |
| 72 | * |
| 73 | * We would have problems if the MsgNum values overflow an integer, so |
| 74 | * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND |
| 75 | * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be |
| 76 | * large so that we don't need to do this often. It must be a multiple of |
| 77 | * MAXNUMMESSAGES so that the existing circular-buffer entries don't need |
| 78 | * to be moved when we do it. |
| 79 | * |
| 80 | * Access to the shared sinval array is protected by two locks, SInvalReadLock |
| 81 | * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this |
| 82 | * authorizes them to modify their own ProcState but not to modify or even |
| 83 | * look at anyone else's. When we need to perform array-wide updates, |
| 84 | * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to |
| 85 | * lock out all readers. Writers take SInvalWriteLock (always in exclusive |
| 86 | * mode) to serialize adding messages to the queue. Note that a writer |
| 87 | * can operate in parallel with one or more readers, because the writer |
| 88 | * has no need to touch anyone's ProcState, except in the infrequent cases |
| 89 | * when SICleanupQueue is needed. The only point of overlap is that |
| 90 | * the writer wants to change maxMsgNum while readers need to read it. |
| 91 | * We deal with that by having a spinlock that readers must take for just |
| 92 | * long enough to read maxMsgNum, while writers take it for just long enough |
| 93 | * to write maxMsgNum. (The exact rule is that you need the spinlock to |
| 94 | * read maxMsgNum if you are not holding SInvalWriteLock, and you need the |
| 95 | * spinlock to write maxMsgNum unless you are holding both locks.) |
| 96 | * |
| 97 | * Note: since maxMsgNum is an int and hence presumably atomically readable/ |
| 98 | * writable, the spinlock might seem unnecessary. The reason it is needed |
| 99 | * is to provide a memory barrier: we need to be sure that messages written |
| 100 | * to the array are actually there before maxMsgNum is increased, and that |
| 101 | * readers will see that data after fetching maxMsgNum. Multiprocessors |
| 102 | * that have weak memory-ordering guarantees can fail without the memory |
| 103 | * barrier instructions that are included in the spinlock sequences. |
| 104 | */ |
| 105 | |
| 106 | |
| 107 | /* |
| 108 | * Configurable parameters. |
| 109 | * |
| 110 | * MAXNUMMESSAGES: max number of shared-inval messages we can buffer. |
| 111 | * Must be a power of 2 for speed. |
| 112 | * |
| 113 | * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow. |
| 114 | * Must be a multiple of MAXNUMMESSAGES. Should be large. |
| 115 | * |
| 116 | * CLEANUP_MIN: the minimum number of messages that must be in the buffer |
| 117 | * before we bother to call SICleanupQueue. |
| 118 | * |
| 119 | * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once |
| 120 | * we exceed CLEANUP_MIN. Should be a power of 2 for speed. |
| 121 | * |
| 122 | * SIG_THRESHOLD: the minimum number of messages a backend must have fallen |
| 123 | * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT. |
| 124 | * |
| 125 | * WRITE_QUANTUM: the max number of messages to push into the buffer per |
| 126 | * iteration of SIInsertDataEntries. Noncritical but should be less than |
| 127 | * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once |
| 128 | * per iteration. |
| 129 | */ |
| 130 | |
| 131 | #define MAXNUMMESSAGES 4096 |
| 132 | #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) |
| 133 | #define CLEANUP_MIN (MAXNUMMESSAGES / 2) |
| 134 | #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) |
| 135 | #define SIG_THRESHOLD (MAXNUMMESSAGES / 2) |
| 136 | #define WRITE_QUANTUM 64 |
| 137 | |
| 138 | /* Per-backend state in shared invalidation structure */ |
| 139 | typedef struct ProcState |
| 140 | { |
| 141 | /* procPid is zero in an inactive ProcState array entry. */ |
| 142 | pid_t procPid; /* PID of backend, for signaling */ |
| 143 | PGPROC *proc; /* PGPROC of backend */ |
| 144 | /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ |
| 145 | int nextMsgNum; /* next message number to read */ |
| 146 | bool resetState; /* backend needs to reset its state */ |
| 147 | bool signaled; /* backend has been sent catchup signal */ |
| 148 | bool hasMessages; /* backend has unread messages */ |
| 149 | |
| 150 | /* |
| 151 | * Backend only sends invalidations, never receives them. This only makes |
| 152 | * sense for Startup process during recovery because it doesn't maintain a |
| 153 | * relcache, yet it fires inval messages to allow query backends to see |
| 154 | * schema changes. |
| 155 | */ |
| 156 | bool sendOnly; /* backend only sends, never receives */ |
| 157 | |
| 158 | /* |
| 159 | * Next LocalTransactionId to use for each idle backend slot. We keep |
| 160 | * this here because it is indexed by BackendId and it is convenient to |
| 161 | * copy the value to and from local memory when MyBackendId is set. It's |
| 162 | * meaningless in an active ProcState entry. |
| 163 | */ |
| 164 | LocalTransactionId nextLXID; |
| 165 | } ProcState; |
| 166 | |
| 167 | /* Shared cache invalidation memory segment */ |
| 168 | typedef struct SISeg |
| 169 | { |
| 170 | /* |
| 171 | * General state information |
| 172 | */ |
| 173 | int minMsgNum; /* oldest message still needed */ |
| 174 | int maxMsgNum; /* next message number to be assigned */ |
| 175 | int nextThreshold; /* # of messages to call SICleanupQueue */ |
| 176 | int lastBackend; /* index of last active procState entry, +1 */ |
| 177 | int maxBackends; /* size of procState array */ |
| 178 | |
| 179 | slock_t msgnumLock; /* spinlock protecting maxMsgNum */ |
| 180 | |
| 181 | /* |
| 182 | * Circular buffer holding shared-inval messages |
| 183 | */ |
| 184 | SharedInvalidationMessage buffer[MAXNUMMESSAGES]; |
| 185 | |
| 186 | /* |
| 187 | * Per-backend invalidation state info (has MaxBackends entries). |
| 188 | */ |
| 189 | ProcState procState[FLEXIBLE_ARRAY_MEMBER]; |
| 190 | } SISeg; |
| 191 | |
| 192 | static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */ |
| 193 | |
| 194 | |
| 195 | static LocalTransactionId nextLocalTransactionId; |
| 196 | |
| 197 | static void CleanupInvalidationState(int status, Datum arg); |
| 198 | |
| 199 | |
| 200 | /* |
| 201 | * SInvalShmemSize --- return shared-memory space needed |
| 202 | */ |
| 203 | Size |
| 204 | SInvalShmemSize(void) |
| 205 | { |
| 206 | Size size; |
| 207 | |
| 208 | size = offsetof(SISeg, procState); |
| 209 | size = add_size(size, mul_size(sizeof(ProcState), MaxBackends)); |
| 210 | |
| 211 | return size; |
| 212 | } |
| 213 | |
| 214 | /* |
| 215 | * CreateSharedInvalidationState |
| 216 | * Create and initialize the SI message buffer |
| 217 | */ |
| 218 | void |
| 219 | CreateSharedInvalidationState(void) |
| 220 | { |
| 221 | int i; |
| 222 | bool found; |
| 223 | |
| 224 | /* Allocate space in shared memory */ |
| 225 | shmInvalBuffer = (SISeg *) |
| 226 | ShmemInitStruct("shmInvalBuffer" , SInvalShmemSize(), &found); |
| 227 | if (found) |
| 228 | return; |
| 229 | |
| 230 | /* Clear message counters, save size of procState array, init spinlock */ |
| 231 | shmInvalBuffer->minMsgNum = 0; |
| 232 | shmInvalBuffer->maxMsgNum = 0; |
| 233 | shmInvalBuffer->nextThreshold = CLEANUP_MIN; |
| 234 | shmInvalBuffer->lastBackend = 0; |
| 235 | shmInvalBuffer->maxBackends = MaxBackends; |
| 236 | SpinLockInit(&shmInvalBuffer->msgnumLock); |
| 237 | |
| 238 | /* The buffer[] array is initially all unused, so we need not fill it */ |
| 239 | |
| 240 | /* Mark all backends inactive, and initialize nextLXID */ |
| 241 | for (i = 0; i < shmInvalBuffer->maxBackends; i++) |
| 242 | { |
| 243 | shmInvalBuffer->procState[i].procPid = 0; /* inactive */ |
| 244 | shmInvalBuffer->procState[i].proc = NULL; |
| 245 | shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ |
| 246 | shmInvalBuffer->procState[i].resetState = false; |
| 247 | shmInvalBuffer->procState[i].signaled = false; |
| 248 | shmInvalBuffer->procState[i].hasMessages = false; |
| 249 | shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; |
| 250 | } |
| 251 | } |
| 252 | |
| 253 | /* |
| 254 | * SharedInvalBackendInit |
| 255 | * Initialize a new backend to operate on the sinval buffer |
| 256 | */ |
| 257 | void |
| 258 | SharedInvalBackendInit(bool sendOnly) |
| 259 | { |
| 260 | int index; |
| 261 | ProcState *stateP = NULL; |
| 262 | SISeg *segP = shmInvalBuffer; |
| 263 | |
| 264 | /* |
| 265 | * This can run in parallel with read operations, but not with write |
| 266 | * operations, since SIInsertDataEntries relies on lastBackend to set |
| 267 | * hasMessages appropriately. |
| 268 | */ |
| 269 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
| 270 | |
| 271 | /* Look for a free entry in the procState array */ |
| 272 | for (index = 0; index < segP->lastBackend; index++) |
| 273 | { |
| 274 | if (segP->procState[index].procPid == 0) /* inactive slot? */ |
| 275 | { |
| 276 | stateP = &segP->procState[index]; |
| 277 | break; |
| 278 | } |
| 279 | } |
| 280 | |
| 281 | if (stateP == NULL) |
| 282 | { |
| 283 | if (segP->lastBackend < segP->maxBackends) |
| 284 | { |
| 285 | stateP = &segP->procState[segP->lastBackend]; |
| 286 | Assert(stateP->procPid == 0); |
| 287 | segP->lastBackend++; |
| 288 | } |
| 289 | else |
| 290 | { |
| 291 | /* |
| 292 | * out of procState slots: MaxBackends exceeded -- report normally |
| 293 | */ |
| 294 | MyBackendId = InvalidBackendId; |
| 295 | LWLockRelease(SInvalWriteLock); |
| 296 | ereport(FATAL, |
| 297 | (errcode(ERRCODE_TOO_MANY_CONNECTIONS), |
| 298 | errmsg("sorry, too many clients already" ))); |
| 299 | } |
| 300 | } |
| 301 | |
| 302 | MyBackendId = (stateP - &segP->procState[0]) + 1; |
| 303 | |
| 304 | /* Advertise assigned backend ID in MyProc */ |
| 305 | MyProc->backendId = MyBackendId; |
| 306 | |
| 307 | /* Fetch next local transaction ID into local memory */ |
| 308 | nextLocalTransactionId = stateP->nextLXID; |
| 309 | |
| 310 | /* mark myself active, with all extant messages already read */ |
| 311 | stateP->procPid = MyProcPid; |
| 312 | stateP->proc = MyProc; |
| 313 | stateP->nextMsgNum = segP->maxMsgNum; |
| 314 | stateP->resetState = false; |
| 315 | stateP->signaled = false; |
| 316 | stateP->hasMessages = false; |
| 317 | stateP->sendOnly = sendOnly; |
| 318 | |
| 319 | LWLockRelease(SInvalWriteLock); |
| 320 | |
| 321 | /* register exit routine to mark my entry inactive at exit */ |
| 322 | on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); |
| 323 | |
| 324 | elog(DEBUG4, "my backend ID is %d" , MyBackendId); |
| 325 | } |
| 326 | |
| 327 | /* |
| 328 | * CleanupInvalidationState |
| 329 | * Mark the current backend as no longer active. |
| 330 | * |
| 331 | * This function is called via on_shmem_exit() during backend shutdown. |
| 332 | * |
| 333 | * arg is really of type "SISeg*". |
| 334 | */ |
| 335 | static void |
| 336 | CleanupInvalidationState(int status, Datum arg) |
| 337 | { |
| 338 | SISeg *segP = (SISeg *) DatumGetPointer(arg); |
| 339 | ProcState *stateP; |
| 340 | int i; |
| 341 | |
| 342 | Assert(PointerIsValid(segP)); |
| 343 | |
| 344 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
| 345 | |
| 346 | stateP = &segP->procState[MyBackendId - 1]; |
| 347 | |
| 348 | /* Update next local transaction ID for next holder of this backendID */ |
| 349 | stateP->nextLXID = nextLocalTransactionId; |
| 350 | |
| 351 | /* Mark myself inactive */ |
| 352 | stateP->procPid = 0; |
| 353 | stateP->proc = NULL; |
| 354 | stateP->nextMsgNum = 0; |
| 355 | stateP->resetState = false; |
| 356 | stateP->signaled = false; |
| 357 | |
| 358 | /* Recompute index of last active backend */ |
| 359 | for (i = segP->lastBackend; i > 0; i--) |
| 360 | { |
| 361 | if (segP->procState[i - 1].procPid != 0) |
| 362 | break; |
| 363 | } |
| 364 | segP->lastBackend = i; |
| 365 | |
| 366 | LWLockRelease(SInvalWriteLock); |
| 367 | } |
| 368 | |
| 369 | /* |
| 370 | * BackendIdGetProc |
| 371 | * Get the PGPROC structure for a backend, given the backend ID. |
| 372 | * The result may be out of date arbitrarily quickly, so the caller |
| 373 | * must be careful about how this information is used. NULL is |
| 374 | * returned if the backend is not active. |
| 375 | */ |
| 376 | PGPROC * |
| 377 | BackendIdGetProc(int backendID) |
| 378 | { |
| 379 | PGPROC *result = NULL; |
| 380 | SISeg *segP = shmInvalBuffer; |
| 381 | |
| 382 | /* Need to lock out additions/removals of backends */ |
| 383 | LWLockAcquire(SInvalWriteLock, LW_SHARED); |
| 384 | |
| 385 | if (backendID > 0 && backendID <= segP->lastBackend) |
| 386 | { |
| 387 | ProcState *stateP = &segP->procState[backendID - 1]; |
| 388 | |
| 389 | result = stateP->proc; |
| 390 | } |
| 391 | |
| 392 | LWLockRelease(SInvalWriteLock); |
| 393 | |
| 394 | return result; |
| 395 | } |
| 396 | |
| 397 | /* |
| 398 | * BackendIdGetTransactionIds |
| 399 | * Get the xid and xmin of the backend. The result may be out of date |
| 400 | * arbitrarily quickly, so the caller must be careful about how this |
| 401 | * information is used. |
| 402 | */ |
| 403 | void |
| 404 | BackendIdGetTransactionIds(int backendID, TransactionId *xid, TransactionId *xmin) |
| 405 | { |
| 406 | SISeg *segP = shmInvalBuffer; |
| 407 | |
| 408 | *xid = InvalidTransactionId; |
| 409 | *xmin = InvalidTransactionId; |
| 410 | |
| 411 | /* Need to lock out additions/removals of backends */ |
| 412 | LWLockAcquire(SInvalWriteLock, LW_SHARED); |
| 413 | |
| 414 | if (backendID > 0 && backendID <= segP->lastBackend) |
| 415 | { |
| 416 | ProcState *stateP = &segP->procState[backendID - 1]; |
| 417 | PGPROC *proc = stateP->proc; |
| 418 | |
| 419 | if (proc != NULL) |
| 420 | { |
| 421 | PGXACT *xact = &ProcGlobal->allPgXact[proc->pgprocno]; |
| 422 | |
| 423 | *xid = xact->xid; |
| 424 | *xmin = xact->xmin; |
| 425 | } |
| 426 | } |
| 427 | |
| 428 | LWLockRelease(SInvalWriteLock); |
| 429 | } |
| 430 | |
| 431 | /* |
| 432 | * SIInsertDataEntries |
| 433 | * Add new invalidation message(s) to the buffer. |
| 434 | */ |
| 435 | void |
| 436 | SIInsertDataEntries(const SharedInvalidationMessage *data, int n) |
| 437 | { |
| 438 | SISeg *segP = shmInvalBuffer; |
| 439 | |
| 440 | /* |
| 441 | * N can be arbitrarily large. We divide the work into groups of no more |
| 442 | * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for |
| 443 | * an unreasonably long time. (This is not so much because we care about |
| 444 | * letting in other writers, as that some just-caught-up backend might be |
| 445 | * trying to do SICleanupQueue to pass on its signal, and we don't want it |
| 446 | * to have to wait a long time.) Also, we need to consider calling |
| 447 | * SICleanupQueue every so often. |
| 448 | */ |
| 449 | while (n > 0) |
| 450 | { |
| 451 | int nthistime = Min(n, WRITE_QUANTUM); |
| 452 | int numMsgs; |
| 453 | int max; |
| 454 | int i; |
| 455 | |
| 456 | n -= nthistime; |
| 457 | |
| 458 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
| 459 | |
| 460 | /* |
| 461 | * If the buffer is full, we *must* acquire some space. Clean the |
| 462 | * queue and reset anyone who is preventing space from being freed. |
| 463 | * Otherwise, clean the queue only when it's exceeded the next |
| 464 | * fullness threshold. We have to loop and recheck the buffer state |
| 465 | * after any call of SICleanupQueue. |
| 466 | */ |
| 467 | for (;;) |
| 468 | { |
| 469 | numMsgs = segP->maxMsgNum - segP->minMsgNum; |
| 470 | if (numMsgs + nthistime > MAXNUMMESSAGES || |
| 471 | numMsgs >= segP->nextThreshold) |
| 472 | SICleanupQueue(true, nthistime); |
| 473 | else |
| 474 | break; |
| 475 | } |
| 476 | |
| 477 | /* |
| 478 | * Insert new message(s) into proper slot of circular buffer |
| 479 | */ |
| 480 | max = segP->maxMsgNum; |
| 481 | while (nthistime-- > 0) |
| 482 | { |
| 483 | segP->buffer[max % MAXNUMMESSAGES] = *data++; |
| 484 | max++; |
| 485 | } |
| 486 | |
| 487 | /* Update current value of maxMsgNum using spinlock */ |
| 488 | SpinLockAcquire(&segP->msgnumLock); |
| 489 | segP->maxMsgNum = max; |
| 490 | SpinLockRelease(&segP->msgnumLock); |
| 491 | |
| 492 | /* |
| 493 | * Now that the maxMsgNum change is globally visible, we give everyone |
| 494 | * a swift kick to make sure they read the newly added messages. |
| 495 | * Releasing SInvalWriteLock will enforce a full memory barrier, so |
| 496 | * these (unlocked) changes will be committed to memory before we exit |
| 497 | * the function. |
| 498 | */ |
| 499 | for (i = 0; i < segP->lastBackend; i++) |
| 500 | { |
| 501 | ProcState *stateP = &segP->procState[i]; |
| 502 | |
| 503 | stateP->hasMessages = true; |
| 504 | } |
| 505 | |
| 506 | LWLockRelease(SInvalWriteLock); |
| 507 | } |
| 508 | } |
| 509 | |
| 510 | /* |
| 511 | * SIGetDataEntries |
| 512 | * get next SI message(s) for current backend, if there are any |
| 513 | * |
| 514 | * Possible return values: |
| 515 | * 0: no SI message available |
| 516 | * n>0: next n SI messages have been extracted into data[] |
| 517 | * -1: SI reset message extracted |
| 518 | * |
| 519 | * If the return value is less than the array size "datasize", the caller |
| 520 | * can assume that there are no more SI messages after the one(s) returned. |
| 521 | * Otherwise, another call is needed to collect more messages. |
| 522 | * |
| 523 | * NB: this can run in parallel with other instances of SIGetDataEntries |
| 524 | * executing on behalf of other backends, since each instance will modify only |
| 525 | * fields of its own backend's ProcState, and no instance will look at fields |
| 526 | * of other backends' ProcStates. We express this by grabbing SInvalReadLock |
| 527 | * in shared mode. Note that this is not exactly the normal (read-only) |
| 528 | * interpretation of a shared lock! Look closely at the interactions before |
| 529 | * allowing SInvalReadLock to be grabbed in shared mode for any other reason! |
| 530 | * |
| 531 | * NB: this can also run in parallel with SIInsertDataEntries. It is not |
| 532 | * guaranteed that we will return any messages added after the routine is |
| 533 | * entered. |
| 534 | * |
| 535 | * Note: we assume that "datasize" is not so large that it might be important |
| 536 | * to break our hold on SInvalReadLock into segments. |
| 537 | */ |
| 538 | int |
| 539 | SIGetDataEntries(SharedInvalidationMessage *data, int datasize) |
| 540 | { |
| 541 | SISeg *segP; |
| 542 | ProcState *stateP; |
| 543 | int max; |
| 544 | int n; |
| 545 | |
| 546 | segP = shmInvalBuffer; |
| 547 | stateP = &segP->procState[MyBackendId - 1]; |
| 548 | |
| 549 | /* |
| 550 | * Before starting to take locks, do a quick, unlocked test to see whether |
| 551 | * there can possibly be anything to read. On a multiprocessor system, |
| 552 | * it's possible that this load could migrate backwards and occur before |
| 553 | * we actually enter this function, so we might miss a sinval message that |
| 554 | * was just added by some other processor. But they can't migrate |
| 555 | * backwards over a preceding lock acquisition, so it should be OK. If we |
| 556 | * haven't acquired a lock preventing against further relevant |
| 557 | * invalidations, any such occurrence is not much different than if the |
| 558 | * invalidation had arrived slightly later in the first place. |
| 559 | */ |
| 560 | if (!stateP->hasMessages) |
| 561 | return 0; |
| 562 | |
| 563 | LWLockAcquire(SInvalReadLock, LW_SHARED); |
| 564 | |
| 565 | /* |
| 566 | * We must reset hasMessages before determining how many messages we're |
| 567 | * going to read. That way, if new messages arrive after we have |
| 568 | * determined how many we're reading, the flag will get reset and we'll |
| 569 | * notice those messages part-way through. |
| 570 | * |
| 571 | * Note that, if we don't end up reading all of the messages, we had |
| 572 | * better be certain to reset this flag before exiting! |
| 573 | */ |
| 574 | stateP->hasMessages = false; |
| 575 | |
| 576 | /* Fetch current value of maxMsgNum using spinlock */ |
| 577 | SpinLockAcquire(&segP->msgnumLock); |
| 578 | max = segP->maxMsgNum; |
| 579 | SpinLockRelease(&segP->msgnumLock); |
| 580 | |
| 581 | if (stateP->resetState) |
| 582 | { |
| 583 | /* |
| 584 | * Force reset. We can say we have dealt with any messages added |
| 585 | * since the reset, as well; and that means we should clear the |
| 586 | * signaled flag, too. |
| 587 | */ |
| 588 | stateP->nextMsgNum = max; |
| 589 | stateP->resetState = false; |
| 590 | stateP->signaled = false; |
| 591 | LWLockRelease(SInvalReadLock); |
| 592 | return -1; |
| 593 | } |
| 594 | |
| 595 | /* |
| 596 | * Retrieve messages and advance backend's counter, until data array is |
| 597 | * full or there are no more messages. |
| 598 | * |
| 599 | * There may be other backends that haven't read the message(s), so we |
| 600 | * cannot delete them here. SICleanupQueue() will eventually remove them |
| 601 | * from the queue. |
| 602 | */ |
| 603 | n = 0; |
| 604 | while (n < datasize && stateP->nextMsgNum < max) |
| 605 | { |
| 606 | data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; |
| 607 | stateP->nextMsgNum++; |
| 608 | } |
| 609 | |
| 610 | /* |
| 611 | * If we have caught up completely, reset our "signaled" flag so that |
| 612 | * we'll get another signal if we fall behind again. |
| 613 | * |
| 614 | * If we haven't caught up completely, reset the hasMessages flag so that |
| 615 | * we see the remaining messages next time. |
| 616 | */ |
| 617 | if (stateP->nextMsgNum >= max) |
| 618 | stateP->signaled = false; |
| 619 | else |
| 620 | stateP->hasMessages = true; |
| 621 | |
| 622 | LWLockRelease(SInvalReadLock); |
| 623 | return n; |
| 624 | } |
| 625 | |
| 626 | /* |
| 627 | * SICleanupQueue |
| 628 | * Remove messages that have been consumed by all active backends |
| 629 | * |
| 630 | * callerHasWriteLock is true if caller is holding SInvalWriteLock. |
| 631 | * minFree is the minimum number of message slots to make free. |
| 632 | * |
| 633 | * Possible side effects of this routine include marking one or more |
| 634 | * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT |
| 635 | * to some backend that seems to be getting too far behind. We signal at |
| 636 | * most one backend at a time, for reasons explained at the top of the file. |
| 637 | * |
| 638 | * Caution: because we transiently release write lock when we have to signal |
| 639 | * some other backend, it is NOT guaranteed that there are still minFree |
| 640 | * free message slots at exit. Caller must recheck and perhaps retry. |
| 641 | */ |
| 642 | void |
| 643 | SICleanupQueue(bool callerHasWriteLock, int minFree) |
| 644 | { |
| 645 | SISeg *segP = shmInvalBuffer; |
| 646 | int min, |
| 647 | minsig, |
| 648 | lowbound, |
| 649 | numMsgs, |
| 650 | i; |
| 651 | ProcState *needSig = NULL; |
| 652 | |
| 653 | /* Lock out all writers and readers */ |
| 654 | if (!callerHasWriteLock) |
| 655 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
| 656 | LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); |
| 657 | |
| 658 | /* |
| 659 | * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the |
| 660 | * furthest-back backend that needs signaling (if any), and reset any |
| 661 | * backends that are too far back. Note that because we ignore sendOnly |
| 662 | * backends here it is possible for them to keep sending messages without |
| 663 | * a problem even when they are the only active backend. |
| 664 | */ |
| 665 | min = segP->maxMsgNum; |
| 666 | minsig = min - SIG_THRESHOLD; |
| 667 | lowbound = min - MAXNUMMESSAGES + minFree; |
| 668 | |
| 669 | for (i = 0; i < segP->lastBackend; i++) |
| 670 | { |
| 671 | ProcState *stateP = &segP->procState[i]; |
| 672 | int n = stateP->nextMsgNum; |
| 673 | |
| 674 | /* Ignore if inactive or already in reset state */ |
| 675 | if (stateP->procPid == 0 || stateP->resetState || stateP->sendOnly) |
| 676 | continue; |
| 677 | |
| 678 | /* |
| 679 | * If we must free some space and this backend is preventing it, force |
| 680 | * him into reset state and then ignore until he catches up. |
| 681 | */ |
| 682 | if (n < lowbound) |
| 683 | { |
| 684 | stateP->resetState = true; |
| 685 | /* no point in signaling him ... */ |
| 686 | continue; |
| 687 | } |
| 688 | |
| 689 | /* Track the global minimum nextMsgNum */ |
| 690 | if (n < min) |
| 691 | min = n; |
| 692 | |
| 693 | /* Also see who's furthest back of the unsignaled backends */ |
| 694 | if (n < minsig && !stateP->signaled) |
| 695 | { |
| 696 | minsig = n; |
| 697 | needSig = stateP; |
| 698 | } |
| 699 | } |
| 700 | segP->minMsgNum = min; |
| 701 | |
| 702 | /* |
| 703 | * When minMsgNum gets really large, decrement all message counters so as |
| 704 | * to forestall overflow of the counters. This happens seldom enough that |
| 705 | * folding it into the previous loop would be a loser. |
| 706 | */ |
| 707 | if (min >= MSGNUMWRAPAROUND) |
| 708 | { |
| 709 | segP->minMsgNum -= MSGNUMWRAPAROUND; |
| 710 | segP->maxMsgNum -= MSGNUMWRAPAROUND; |
| 711 | for (i = 0; i < segP->lastBackend; i++) |
| 712 | { |
| 713 | /* we don't bother skipping inactive entries here */ |
| 714 | segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; |
| 715 | } |
| 716 | } |
| 717 | |
| 718 | /* |
| 719 | * Determine how many messages are still in the queue, and set the |
| 720 | * threshold at which we should repeat SICleanupQueue(). |
| 721 | */ |
| 722 | numMsgs = segP->maxMsgNum - segP->minMsgNum; |
| 723 | if (numMsgs < CLEANUP_MIN) |
| 724 | segP->nextThreshold = CLEANUP_MIN; |
| 725 | else |
| 726 | segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; |
| 727 | |
| 728 | /* |
| 729 | * Lastly, signal anyone who needs a catchup interrupt. Since |
| 730 | * SendProcSignal() might not be fast, we don't want to hold locks while |
| 731 | * executing it. |
| 732 | */ |
| 733 | if (needSig) |
| 734 | { |
| 735 | pid_t his_pid = needSig->procPid; |
| 736 | BackendId his_backendId = (needSig - &segP->procState[0]) + 1; |
| 737 | |
| 738 | needSig->signaled = true; |
| 739 | LWLockRelease(SInvalReadLock); |
| 740 | LWLockRelease(SInvalWriteLock); |
| 741 | elog(DEBUG4, "sending sinval catchup signal to PID %d" , (int) his_pid); |
| 742 | SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId); |
| 743 | if (callerHasWriteLock) |
| 744 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
| 745 | } |
| 746 | else |
| 747 | { |
| 748 | LWLockRelease(SInvalReadLock); |
| 749 | if (!callerHasWriteLock) |
| 750 | LWLockRelease(SInvalWriteLock); |
| 751 | } |
| 752 | } |
| 753 | |
| 754 | |
| 755 | /* |
| 756 | * GetNextLocalTransactionId --- allocate a new LocalTransactionId |
| 757 | * |
| 758 | * We split VirtualTransactionIds into two parts so that it is possible |
| 759 | * to allocate a new one without any contention for shared memory, except |
| 760 | * for a bit of additional overhead during backend startup/shutdown. |
| 761 | * The high-order part of a VirtualTransactionId is a BackendId, and the |
| 762 | * low-order part is a LocalTransactionId, which we assign from a local |
| 763 | * counter. To avoid the risk of a VirtualTransactionId being reused |
| 764 | * within a short interval, successive procs occupying the same backend ID |
| 765 | * slot should use a consecutive sequence of local IDs, which is implemented |
| 766 | * by copying nextLocalTransactionId as seen above. |
| 767 | */ |
| 768 | LocalTransactionId |
| 769 | GetNextLocalTransactionId(void) |
| 770 | { |
| 771 | LocalTransactionId result; |
| 772 | |
| 773 | /* loop to avoid returning InvalidLocalTransactionId at wraparound */ |
| 774 | do |
| 775 | { |
| 776 | result = nextLocalTransactionId++; |
| 777 | } while (!LocalTransactionIdIsValid(result)); |
| 778 | |
| 779 | return result; |
| 780 | } |
| 781 | |