1/*-------------------------------------------------------------------------
2 *
3 * shm_mq.c
4 * single-reader, single-writer shared memory message queue
5 *
6 * Both the sender and the receiver must have a PGPROC; their respective
7 * process latches are used for synchronization. Only the sender may send,
8 * and only the receiver may receive. This is intended to allow a user
9 * backend to communicate with worker backends that it has registered.
10 *
11 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
12 * Portions Copyright (c) 1994, Regents of the University of California
13 *
14 * src/backend/storage/ipc/shm_mq.c
15 *
16 *-------------------------------------------------------------------------
17 */
18
19#include "postgres.h"
20
21#include "miscadmin.h"
22#include "pgstat.h"
23#include "postmaster/bgworker.h"
24#include "storage/procsignal.h"
25#include "storage/shm_mq.h"
26#include "storage/spin.h"
27
28/*
29 * This structure represents the actual queue, stored in shared memory.
30 *
31 * Some notes on synchronization:
32 *
33 * mq_receiver and mq_bytes_read can only be changed by the receiver; and
34 * mq_sender and mq_bytes_written can only be changed by the sender.
35 * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
36 * they cannot change once set, and thus may be read without a lock once this
37 * is known to be the case.
38 *
39 * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead,
40 * they are written atomically using 8 byte loads and stores. Memory barriers
41 * must be carefully used to synchronize reads and writes of these values with
42 * reads and writes of the actual data in mq_ring.
43 *
44 * mq_detached needs no locking. It can be set by either the sender or the
45 * receiver, but only ever from false to true, so redundant writes don't
46 * matter. It is important that if we set mq_detached and then set the
47 * counterparty's latch, the counterparty must be certain to see the change
48 * after waking up. Since SetLatch begins with a memory barrier and ResetLatch
49 * ends with one, this should be OK.
50 *
51 * mq_ring_size and mq_ring_offset never change after initialization, and
52 * can therefore be read without the lock.
53 *
54 * Importantly, mq_ring can be safely read and written without a lock.
55 * At any given time, the difference between mq_bytes_read and
56 * mq_bytes_written defines the number of bytes within mq_ring that contain
57 * unread data, and mq_bytes_read defines the position where those bytes
58 * begin. The sender can increase the number of unread bytes at any time,
59 * but only the receiver can give license to overwrite those bytes, by
60 * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read
61 * the unread bytes it knows to be present without the lock. Conversely,
62 * the sender can write to the unused portion of the ring buffer without
63 * the lock, because nobody else can be reading or writing those bytes. The
64 * receiver could be making more bytes unused by incrementing mq_bytes_read,
65 * but that's OK. Note that it would be unsafe for the receiver to read any
66 * data it's already marked as read, or to write any data; and it would be
67 * unsafe for the sender to reread any data after incrementing
68 * mq_bytes_written, but fortunately there's no need for any of that.
69 */
70struct shm_mq
71{
72 slock_t mq_mutex;
73 PGPROC *mq_receiver;
74 PGPROC *mq_sender;
75 pg_atomic_uint64 mq_bytes_read;
76 pg_atomic_uint64 mq_bytes_written;
77 Size mq_ring_size;
78 bool mq_detached;
79 uint8 mq_ring_offset;
80 char mq_ring[FLEXIBLE_ARRAY_MEMBER];
81};
82
83/*
84 * This structure is a backend-private handle for access to a queue.
85 *
86 * mqh_queue is a pointer to the queue we've attached, and mqh_segment is
87 * an optional pointer to the dynamic shared memory segment that contains it.
88 * (If mqh_segment is provided, we register an on_dsm_detach callback to
89 * make sure we detach from the queue before detaching from DSM.)
90 *
91 * If this queue is intended to connect the current process with a background
92 * worker that started it, the user can pass a pointer to the worker handle
93 * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this
94 * is to allow us to begin sending to or receiving from that queue before the
95 * process we'll be communicating with has even been started. If it fails
96 * to start, the handle will allow us to notice that and fail cleanly, rather
97 * than waiting forever; see shm_mq_wait_internal. This is mostly useful in
98 * simple cases - e.g. where there are just 2 processes communicating; in
99 * more complex scenarios, every process may not have a BackgroundWorkerHandle
100 * available, or may need to watch for the failure of more than one other
101 * process at a time.
102 *
103 * When a message exists as a contiguous chunk of bytes in the queue - that is,
104 * it is smaller than the size of the ring buffer and does not wrap around
105 * the end - we return the message to the caller as a pointer into the buffer.
106 * For messages that are larger or happen to wrap, we reassemble the message
107 * locally by copying the chunks into a backend-local buffer. mqh_buffer is
108 * the buffer, and mqh_buflen is the number of bytes allocated for it.
109 *
110 * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
111 * are used to track the state of non-blocking operations. When the caller
112 * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
113 * are expected to retry the call at a later time with the same argument;
114 * we need to retain enough state to pick up where we left off.
115 * mqh_length_word_complete tracks whether we are done sending or receiving
116 * (whichever we're doing) the entire length word. mqh_partial_bytes tracks
117 * the number of bytes read or written for either the length word or the
118 * message itself, and mqh_expected_bytes - which is used only for reads -
119 * tracks the expected total size of the payload.
120 *
121 * mqh_counterparty_attached tracks whether we know the counterparty to have
122 * attached to the queue at some previous point. This lets us avoid some
123 * mutex acquisitions.
124 *
125 * mqh_context is the memory context in effect at the time we attached to
126 * the shm_mq. The shm_mq_handle itself is allocated in this context, and
127 * we make sure any other allocations we do happen in this context as well,
128 * to avoid nasty surprises.
129 */
130struct shm_mq_handle
131{
132 shm_mq *mqh_queue;
133 dsm_segment *mqh_segment;
134 BackgroundWorkerHandle *mqh_handle;
135 char *mqh_buffer;
136 Size mqh_buflen;
137 Size mqh_consume_pending;
138 Size mqh_partial_bytes;
139 Size mqh_expected_bytes;
140 bool mqh_length_word_complete;
141 bool mqh_counterparty_attached;
142 MemoryContext mqh_context;
143};
144
145static void shm_mq_detach_internal(shm_mq *mq);
146static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
147 const void *data, bool nowait, Size *bytes_written);
148static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
149 Size bytes_needed, bool nowait, Size *nbytesp,
150 void **datap);
151static bool shm_mq_counterparty_gone(shm_mq *mq,
152 BackgroundWorkerHandle *handle);
153static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr,
154 BackgroundWorkerHandle *handle);
155static void shm_mq_inc_bytes_read(shm_mq *mq, Size n);
156static void shm_mq_inc_bytes_written(shm_mq *mq, Size n);
157static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
158
159/* Minimum queue size is enough for header and at least one chunk of data. */
160const Size shm_mq_minimum_size =
161MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF;
162
163#define MQH_INITIAL_BUFSIZE 8192
164
165/*
166 * Initialize a new shared message queue.
167 */
168shm_mq *
169shm_mq_create(void *address, Size size)
170{
171 shm_mq *mq = address;
172 Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
173
174 /* If the size isn't MAXALIGN'd, just discard the odd bytes. */
175 size = MAXALIGN_DOWN(size);
176
177 /* Queue size must be large enough to hold some data. */
178 Assert(size > data_offset);
179
180 /* Initialize queue header. */
181 SpinLockInit(&mq->mq_mutex);
182 mq->mq_receiver = NULL;
183 mq->mq_sender = NULL;
184 pg_atomic_init_u64(&mq->mq_bytes_read, 0);
185 pg_atomic_init_u64(&mq->mq_bytes_written, 0);
186 mq->mq_ring_size = size - data_offset;
187 mq->mq_detached = false;
188 mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
189
190 return mq;
191}
192
193/*
194 * Set the identity of the process that will receive from a shared message
195 * queue.
196 */
197void
198shm_mq_set_receiver(shm_mq *mq, PGPROC *proc)
199{
200 PGPROC *sender;
201
202 SpinLockAcquire(&mq->mq_mutex);
203 Assert(mq->mq_receiver == NULL);
204 mq->mq_receiver = proc;
205 sender = mq->mq_sender;
206 SpinLockRelease(&mq->mq_mutex);
207
208 if (sender != NULL)
209 SetLatch(&sender->procLatch);
210}
211
212/*
213 * Set the identity of the process that will send to a shared message queue.
214 */
215void
216shm_mq_set_sender(shm_mq *mq, PGPROC *proc)
217{
218 PGPROC *receiver;
219
220 SpinLockAcquire(&mq->mq_mutex);
221 Assert(mq->mq_sender == NULL);
222 mq->mq_sender = proc;
223 receiver = mq->mq_receiver;
224 SpinLockRelease(&mq->mq_mutex);
225
226 if (receiver != NULL)
227 SetLatch(&receiver->procLatch);
228}
229
230/*
231 * Get the configured receiver.
232 */
233PGPROC *
234shm_mq_get_receiver(shm_mq *mq)
235{
236 PGPROC *receiver;
237
238 SpinLockAcquire(&mq->mq_mutex);
239 receiver = mq->mq_receiver;
240 SpinLockRelease(&mq->mq_mutex);
241
242 return receiver;
243}
244
245/*
246 * Get the configured sender.
247 */
248PGPROC *
249shm_mq_get_sender(shm_mq *mq)
250{
251 PGPROC *sender;
252
253 SpinLockAcquire(&mq->mq_mutex);
254 sender = mq->mq_sender;
255 SpinLockRelease(&mq->mq_mutex);
256
257 return sender;
258}
259
260/*
261 * Attach to a shared message queue so we can send or receive messages.
262 *
263 * The memory context in effect at the time this function is called should
264 * be one which will last for at least as long as the message queue itself.
265 * We'll allocate the handle in that context, and future allocations that
266 * are needed to buffer incoming data will happen in that context as well.
267 *
268 * If seg != NULL, the queue will be automatically detached when that dynamic
269 * shared memory segment is detached.
270 *
271 * If handle != NULL, the queue can be read or written even before the
272 * other process has attached. We'll wait for it to do so if needed. The
273 * handle must be for a background worker initialized with bgw_notify_pid
274 * equal to our PID.
275 *
276 * shm_mq_detach() should be called when done. This will free the
277 * shm_mq_handle and mark the queue itself as detached, so that our
278 * counterpart won't get stuck waiting for us to fill or drain the queue
279 * after we've already lost interest.
280 */
281shm_mq_handle *
282shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
283{
284 shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle));
285
286 Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc);
287 mqh->mqh_queue = mq;
288 mqh->mqh_segment = seg;
289 mqh->mqh_handle = handle;
290 mqh->mqh_buffer = NULL;
291 mqh->mqh_buflen = 0;
292 mqh->mqh_consume_pending = 0;
293 mqh->mqh_partial_bytes = 0;
294 mqh->mqh_expected_bytes = 0;
295 mqh->mqh_length_word_complete = false;
296 mqh->mqh_counterparty_attached = false;
297 mqh->mqh_context = CurrentMemoryContext;
298
299 if (seg != NULL)
300 on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq));
301
302 return mqh;
303}
304
305/*
306 * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had
307 * been passed to shm_mq_attach.
308 */
309void
310shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
311{
312 Assert(mqh->mqh_handle == NULL);
313 mqh->mqh_handle = handle;
314}
315
316/*
317 * Write a message into a shared message queue.
318 */
319shm_mq_result
320shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
321{
322 shm_mq_iovec iov;
323
324 iov.data = data;
325 iov.len = nbytes;
326
327 return shm_mq_sendv(mqh, &iov, 1, nowait);
328}
329
330/*
331 * Write a message into a shared message queue, gathered from multiple
332 * addresses.
333 *
334 * When nowait = false, we'll wait on our process latch when the ring buffer
335 * fills up, and then continue writing once the receiver has drained some data.
336 * The process latch is reset after each wait.
337 *
338 * When nowait = true, we do not manipulate the state of the process latch;
339 * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In
340 * this case, the caller should call this function again, with the same
341 * arguments, each time the process latch is set. (Once begun, the sending
342 * of a message cannot be aborted except by detaching from the queue; changing
343 * the length or payload will corrupt the queue.)
344 */
345shm_mq_result
346shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
347{
348 shm_mq_result res;
349 shm_mq *mq = mqh->mqh_queue;
350 PGPROC *receiver;
351 Size nbytes = 0;
352 Size bytes_written;
353 int i;
354 int which_iov = 0;
355 Size offset;
356
357 Assert(mq->mq_sender == MyProc);
358
359 /* Compute total size of write. */
360 for (i = 0; i < iovcnt; ++i)
361 nbytes += iov[i].len;
362
363 /* Try to write, or finish writing, the length word into the buffer. */
364 while (!mqh->mqh_length_word_complete)
365 {
366 Assert(mqh->mqh_partial_bytes < sizeof(Size));
367 res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
368 ((char *) &nbytes) + mqh->mqh_partial_bytes,
369 nowait, &bytes_written);
370
371 if (res == SHM_MQ_DETACHED)
372 {
373 /* Reset state in case caller tries to send another message. */
374 mqh->mqh_partial_bytes = 0;
375 mqh->mqh_length_word_complete = false;
376 return res;
377 }
378 mqh->mqh_partial_bytes += bytes_written;
379
380 if (mqh->mqh_partial_bytes >= sizeof(Size))
381 {
382 Assert(mqh->mqh_partial_bytes == sizeof(Size));
383
384 mqh->mqh_partial_bytes = 0;
385 mqh->mqh_length_word_complete = true;
386 }
387
388 if (res != SHM_MQ_SUCCESS)
389 return res;
390
391 /* Length word can't be split unless bigger than required alignment. */
392 Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
393 }
394
395 /* Write the actual data bytes into the buffer. */
396 Assert(mqh->mqh_partial_bytes <= nbytes);
397 offset = mqh->mqh_partial_bytes;
398 do
399 {
400 Size chunksize;
401
402 /* Figure out which bytes need to be sent next. */
403 if (offset >= iov[which_iov].len)
404 {
405 offset -= iov[which_iov].len;
406 ++which_iov;
407 if (which_iov >= iovcnt)
408 break;
409 continue;
410 }
411
412 /*
413 * We want to avoid copying the data if at all possible, but every
414 * chunk of bytes we write into the queue has to be MAXALIGN'd, except
415 * the last. Thus, if a chunk other than the last one ends on a
416 * non-MAXALIGN'd boundary, we have to combine the tail end of its
417 * data with data from one or more following chunks until we either
418 * reach the last chunk or accumulate a number of bytes which is
419 * MAXALIGN'd.
420 */
421 if (which_iov + 1 < iovcnt &&
422 offset + MAXIMUM_ALIGNOF > iov[which_iov].len)
423 {
424 char tmpbuf[MAXIMUM_ALIGNOF];
425 int j = 0;
426
427 for (;;)
428 {
429 if (offset < iov[which_iov].len)
430 {
431 tmpbuf[j] = iov[which_iov].data[offset];
432 j++;
433 offset++;
434 if (j == MAXIMUM_ALIGNOF)
435 break;
436 }
437 else
438 {
439 offset -= iov[which_iov].len;
440 which_iov++;
441 if (which_iov >= iovcnt)
442 break;
443 }
444 }
445
446 res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written);
447
448 if (res == SHM_MQ_DETACHED)
449 {
450 /* Reset state in case caller tries to send another message. */
451 mqh->mqh_partial_bytes = 0;
452 mqh->mqh_length_word_complete = false;
453 return res;
454 }
455
456 mqh->mqh_partial_bytes += bytes_written;
457 if (res != SHM_MQ_SUCCESS)
458 return res;
459 continue;
460 }
461
462 /*
463 * If this is the last chunk, we can write all the data, even if it
464 * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to
465 * MAXALIGN_DOWN the write size.
466 */
467 chunksize = iov[which_iov].len - offset;
468 if (which_iov + 1 < iovcnt)
469 chunksize = MAXALIGN_DOWN(chunksize);
470 res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset],
471 nowait, &bytes_written);
472
473 if (res == SHM_MQ_DETACHED)
474 {
475 /* Reset state in case caller tries to send another message. */
476 mqh->mqh_length_word_complete = false;
477 mqh->mqh_partial_bytes = 0;
478 return res;
479 }
480
481 mqh->mqh_partial_bytes += bytes_written;
482 offset += bytes_written;
483 if (res != SHM_MQ_SUCCESS)
484 return res;
485 } while (mqh->mqh_partial_bytes < nbytes);
486
487 /* Reset for next message. */
488 mqh->mqh_partial_bytes = 0;
489 mqh->mqh_length_word_complete = false;
490
491 /* If queue has been detached, let caller know. */
492 if (mq->mq_detached)
493 return SHM_MQ_DETACHED;
494
495 /*
496 * If the counterparty is known to have attached, we can read mq_receiver
497 * without acquiring the spinlock and assume it isn't NULL. Otherwise,
498 * more caution is needed.
499 */
500 if (mqh->mqh_counterparty_attached)
501 receiver = mq->mq_receiver;
502 else
503 {
504 SpinLockAcquire(&mq->mq_mutex);
505 receiver = mq->mq_receiver;
506 SpinLockRelease(&mq->mq_mutex);
507 if (receiver == NULL)
508 return SHM_MQ_SUCCESS;
509 mqh->mqh_counterparty_attached = true;
510 }
511
512 /* Notify receiver of the newly-written data, and return. */
513 SetLatch(&receiver->procLatch);
514 return SHM_MQ_SUCCESS;
515}
516
517/*
518 * Receive a message from a shared message queue.
519 *
520 * We set *nbytes to the message length and *data to point to the message
521 * payload. If the entire message exists in the queue as a single,
522 * contiguous chunk, *data will point directly into shared memory; otherwise,
523 * it will point to a temporary buffer. This mostly avoids data copying in
524 * the hoped-for case where messages are short compared to the buffer size,
525 * while still allowing longer messages. In either case, the return value
526 * remains valid until the next receive operation is performed on the queue.
527 *
528 * When nowait = false, we'll wait on our process latch when the ring buffer
529 * is empty and we have not yet received a full message. The sender will
530 * set our process latch after more data has been written, and we'll resume
531 * processing. Each call will therefore return a complete message
532 * (unless the sender detaches the queue).
533 *
534 * When nowait = true, we do not manipulate the state of the process latch;
535 * instead, whenever the buffer is empty and we need to read from it, we
536 * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this
537 * function again after the process latch has been set.
538 */
539shm_mq_result
540shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
541{
542 shm_mq *mq = mqh->mqh_queue;
543 shm_mq_result res;
544 Size rb = 0;
545 Size nbytes;
546 void *rawdata;
547
548 Assert(mq->mq_receiver == MyProc);
549
550 /* We can't receive data until the sender has attached. */
551 if (!mqh->mqh_counterparty_attached)
552 {
553 if (nowait)
554 {
555 int counterparty_gone;
556
557 /*
558 * We shouldn't return at this point at all unless the sender
559 * hasn't attached yet. However, the correct return value depends
560 * on whether the sender is still attached. If we first test
561 * whether the sender has ever attached and then test whether the
562 * sender has detached, there's a race condition: a sender that
563 * attaches and detaches very quickly might fool us into thinking
564 * the sender never attached at all. So, test whether our
565 * counterparty is definitively gone first, and only afterwards
566 * check whether the sender ever attached in the first place.
567 */
568 counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle);
569 if (shm_mq_get_sender(mq) == NULL)
570 {
571 if (counterparty_gone)
572 return SHM_MQ_DETACHED;
573 else
574 return SHM_MQ_WOULD_BLOCK;
575 }
576 }
577 else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
578 && shm_mq_get_sender(mq) == NULL)
579 {
580 mq->mq_detached = true;
581 return SHM_MQ_DETACHED;
582 }
583 mqh->mqh_counterparty_attached = true;
584 }
585
586 /*
587 * If we've consumed an amount of data greater than 1/4th of the ring
588 * size, mark it consumed in shared memory. We try to avoid doing this
589 * unnecessarily when only a small amount of data has been consumed,
590 * because SetLatch() is fairly expensive and we don't want to do it too
591 * often.
592 */
593 if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
594 {
595 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
596 mqh->mqh_consume_pending = 0;
597 }
598
599 /* Try to read, or finish reading, the length word from the buffer. */
600 while (!mqh->mqh_length_word_complete)
601 {
602 /* Try to receive the message length word. */
603 Assert(mqh->mqh_partial_bytes < sizeof(Size));
604 res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
605 nowait, &rb, &rawdata);
606 if (res != SHM_MQ_SUCCESS)
607 return res;
608
609 /*
610 * Hopefully, we'll receive the entire message length word at once.
611 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
612 * multiple reads.
613 */
614 if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
615 {
616 Size needed;
617
618 nbytes = *(Size *) rawdata;
619
620 /* If we've already got the whole message, we're done. */
621 needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
622 if (rb >= needed)
623 {
624 mqh->mqh_consume_pending += needed;
625 *nbytesp = nbytes;
626 *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
627 return SHM_MQ_SUCCESS;
628 }
629
630 /*
631 * We don't have the whole message, but we at least have the whole
632 * length word.
633 */
634 mqh->mqh_expected_bytes = nbytes;
635 mqh->mqh_length_word_complete = true;
636 mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
637 rb -= MAXALIGN(sizeof(Size));
638 }
639 else
640 {
641 Size lengthbytes;
642
643 /* Can't be split unless bigger than required alignment. */
644 Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
645
646 /* Message word is split; need buffer to reassemble. */
647 if (mqh->mqh_buffer == NULL)
648 {
649 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
650 MQH_INITIAL_BUFSIZE);
651 mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
652 }
653 Assert(mqh->mqh_buflen >= sizeof(Size));
654
655 /* Copy partial length word; remember to consume it. */
656 if (mqh->mqh_partial_bytes + rb > sizeof(Size))
657 lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
658 else
659 lengthbytes = rb;
660 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
661 lengthbytes);
662 mqh->mqh_partial_bytes += lengthbytes;
663 mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
664 rb -= lengthbytes;
665
666 /* If we now have the whole word, we're ready to read payload. */
667 if (mqh->mqh_partial_bytes >= sizeof(Size))
668 {
669 Assert(mqh->mqh_partial_bytes == sizeof(Size));
670 mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer;
671 mqh->mqh_length_word_complete = true;
672 mqh->mqh_partial_bytes = 0;
673 }
674 }
675 }
676 nbytes = mqh->mqh_expected_bytes;
677
678 if (mqh->mqh_partial_bytes == 0)
679 {
680 /*
681 * Try to obtain the whole message in a single chunk. If this works,
682 * we need not copy the data and can return a pointer directly into
683 * shared memory.
684 */
685 res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
686 if (res != SHM_MQ_SUCCESS)
687 return res;
688 if (rb >= nbytes)
689 {
690 mqh->mqh_length_word_complete = false;
691 mqh->mqh_consume_pending += MAXALIGN(nbytes);
692 *nbytesp = nbytes;
693 *datap = rawdata;
694 return SHM_MQ_SUCCESS;
695 }
696
697 /*
698 * The message has wrapped the buffer. We'll need to copy it in order
699 * to return it to the client in one chunk. First, make sure we have
700 * a large enough buffer available.
701 */
702 if (mqh->mqh_buflen < nbytes)
703 {
704 Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
705
706 while (newbuflen < nbytes)
707 newbuflen *= 2;
708
709 if (mqh->mqh_buffer != NULL)
710 {
711 pfree(mqh->mqh_buffer);
712 mqh->mqh_buffer = NULL;
713 mqh->mqh_buflen = 0;
714 }
715 mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen);
716 mqh->mqh_buflen = newbuflen;
717 }
718 }
719
720 /* Loop until we've copied the entire message. */
721 for (;;)
722 {
723 Size still_needed;
724
725 /* Copy as much as we can. */
726 Assert(mqh->mqh_partial_bytes + rb <= nbytes);
727 memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
728 mqh->mqh_partial_bytes += rb;
729
730 /*
731 * Update count of bytes that can be consumed, accounting for
732 * alignment padding. Note that this will never actually insert any
733 * padding except at the end of a message, because the buffer size is
734 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
735 */
736 Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
737 mqh->mqh_consume_pending += MAXALIGN(rb);
738
739 /* If we got all the data, exit the loop. */
740 if (mqh->mqh_partial_bytes >= nbytes)
741 break;
742
743 /* Wait for some more data. */
744 still_needed = nbytes - mqh->mqh_partial_bytes;
745 res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
746 if (res != SHM_MQ_SUCCESS)
747 return res;
748 if (rb > still_needed)
749 rb = still_needed;
750 }
751
752 /* Return the complete message, and reset for next message. */
753 *nbytesp = nbytes;
754 *datap = mqh->mqh_buffer;
755 mqh->mqh_length_word_complete = false;
756 mqh->mqh_partial_bytes = 0;
757 return SHM_MQ_SUCCESS;
758}
759
760/*
761 * Wait for the other process that's supposed to use this queue to attach
762 * to it.
763 *
764 * The return value is SHM_MQ_DETACHED if the worker has already detached or
765 * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached.
766 * Note that we will only be able to detect that the worker has died before
767 * attaching if a background worker handle was passed to shm_mq_attach().
768 */
769shm_mq_result
770shm_mq_wait_for_attach(shm_mq_handle *mqh)
771{
772 shm_mq *mq = mqh->mqh_queue;
773 PGPROC **victim;
774
775 if (shm_mq_get_receiver(mq) == MyProc)
776 victim = &mq->mq_sender;
777 else
778 {
779 Assert(shm_mq_get_sender(mq) == MyProc);
780 victim = &mq->mq_receiver;
781 }
782
783 if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle))
784 return SHM_MQ_SUCCESS;
785 else
786 return SHM_MQ_DETACHED;
787}
788
789/*
790 * Detach from a shared message queue, and destroy the shm_mq_handle.
791 */
792void
793shm_mq_detach(shm_mq_handle *mqh)
794{
795 /* Notify counterparty that we're outta here. */
796 shm_mq_detach_internal(mqh->mqh_queue);
797
798 /* Cancel on_dsm_detach callback, if any. */
799 if (mqh->mqh_segment)
800 cancel_on_dsm_detach(mqh->mqh_segment,
801 shm_mq_detach_callback,
802 PointerGetDatum(mqh->mqh_queue));
803
804 /* Release local memory associated with handle. */
805 if (mqh->mqh_buffer != NULL)
806 pfree(mqh->mqh_buffer);
807 pfree(mqh);
808}
809
810/*
811 * Notify counterparty that we're detaching from shared message queue.
812 *
813 * The purpose of this function is to make sure that the process
814 * with which we're communicating doesn't block forever waiting for us to
815 * fill or drain the queue once we've lost interest. When the sender
816 * detaches, the receiver can read any messages remaining in the queue;
817 * further reads will return SHM_MQ_DETACHED. If the receiver detaches,
818 * further attempts to send messages will likewise return SHM_MQ_DETACHED.
819 *
820 * This is separated out from shm_mq_detach() because if the on_dsm_detach
821 * callback fires, we only want to do this much. We do not try to touch
822 * the local shm_mq_handle, as it may have been pfree'd already.
823 */
824static void
825shm_mq_detach_internal(shm_mq *mq)
826{
827 PGPROC *victim;
828
829 SpinLockAcquire(&mq->mq_mutex);
830 if (mq->mq_sender == MyProc)
831 victim = mq->mq_receiver;
832 else
833 {
834 Assert(mq->mq_receiver == MyProc);
835 victim = mq->mq_sender;
836 }
837 mq->mq_detached = true;
838 SpinLockRelease(&mq->mq_mutex);
839
840 if (victim != NULL)
841 SetLatch(&victim->procLatch);
842}
843
844/*
845 * Get the shm_mq from handle.
846 */
847shm_mq *
848shm_mq_get_queue(shm_mq_handle *mqh)
849{
850 return mqh->mqh_queue;
851}
852
853/*
854 * Write bytes into a shared message queue.
855 */
856static shm_mq_result
857shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
858 bool nowait, Size *bytes_written)
859{
860 shm_mq *mq = mqh->mqh_queue;
861 Size sent = 0;
862 uint64 used;
863 Size ringsize = mq->mq_ring_size;
864 Size available;
865
866 while (sent < nbytes)
867 {
868 uint64 rb;
869 uint64 wb;
870
871 /* Compute number of ring buffer bytes used and available. */
872 rb = pg_atomic_read_u64(&mq->mq_bytes_read);
873 wb = pg_atomic_read_u64(&mq->mq_bytes_written);
874 Assert(wb >= rb);
875 used = wb - rb;
876 Assert(used <= ringsize);
877 available = Min(ringsize - used, nbytes - sent);
878
879 /*
880 * Bail out if the queue has been detached. Note that we would be in
881 * trouble if the compiler decided to cache the value of
882 * mq->mq_detached in a register or on the stack across loop
883 * iterations. It probably shouldn't do that anyway since we'll
884 * always return, call an external function that performs a system
885 * call, or reach a memory barrier at some point later in the loop,
886 * but just to be sure, insert a compiler barrier here.
887 */
888 pg_compiler_barrier();
889 if (mq->mq_detached)
890 {
891 *bytes_written = sent;
892 return SHM_MQ_DETACHED;
893 }
894
895 if (available == 0 && !mqh->mqh_counterparty_attached)
896 {
897 /*
898 * The queue is full, so if the receiver isn't yet known to be
899 * attached, we must wait for that to happen.
900 */
901 if (nowait)
902 {
903 if (shm_mq_counterparty_gone(mq, mqh->mqh_handle))
904 {
905 *bytes_written = sent;
906 return SHM_MQ_DETACHED;
907 }
908 if (shm_mq_get_receiver(mq) == NULL)
909 {
910 *bytes_written = sent;
911 return SHM_MQ_WOULD_BLOCK;
912 }
913 }
914 else if (!shm_mq_wait_internal(mq, &mq->mq_receiver,
915 mqh->mqh_handle))
916 {
917 mq->mq_detached = true;
918 *bytes_written = sent;
919 return SHM_MQ_DETACHED;
920 }
921 mqh->mqh_counterparty_attached = true;
922
923 /*
924 * The receiver may have read some data after attaching, so we
925 * must not wait without rechecking the queue state.
926 */
927 }
928 else if (available == 0)
929 {
930 /*
931 * Since mq->mqh_counterparty_attached is known to be true at this
932 * point, mq_receiver has been set, and it can't change once set.
933 * Therefore, we can read it without acquiring the spinlock.
934 */
935 Assert(mqh->mqh_counterparty_attached);
936 SetLatch(&mq->mq_receiver->procLatch);
937
938 /* Skip manipulation of our latch if nowait = true. */
939 if (nowait)
940 {
941 *bytes_written = sent;
942 return SHM_MQ_WOULD_BLOCK;
943 }
944
945 /*
946 * Wait for our latch to be set. It might already be set for some
947 * unrelated reason, but that'll just result in one extra trip
948 * through the loop. It's worth it to avoid resetting the latch
949 * at top of loop, because setting an already-set latch is much
950 * cheaper than setting one that has been reset.
951 */
952 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
953 WAIT_EVENT_MQ_SEND);
954
955 /* Reset the latch so we don't spin. */
956 ResetLatch(MyLatch);
957
958 /* An interrupt may have occurred while we were waiting. */
959 CHECK_FOR_INTERRUPTS();
960 }
961 else
962 {
963 Size offset;
964 Size sendnow;
965
966 offset = wb % (uint64) ringsize;
967 sendnow = Min(available, ringsize - offset);
968
969 /*
970 * Write as much data as we can via a single memcpy(). Make sure
971 * these writes happen after the read of mq_bytes_read, above.
972 * This barrier pairs with the one in shm_mq_inc_bytes_read.
973 * (Since we're separating the read of mq_bytes_read from a
974 * subsequent write to mq_ring, we need a full barrier here.)
975 */
976 pg_memory_barrier();
977 memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
978 (char *) data + sent, sendnow);
979 sent += sendnow;
980
981 /*
982 * Update count of bytes written, with alignment padding. Note
983 * that this will never actually insert any padding except at the
984 * end of a run of bytes, because the buffer size is a multiple of
985 * MAXIMUM_ALIGNOF, and each read is as well.
986 */
987 Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
988 shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
989
990 /*
991 * For efficiency, we don't set the reader's latch here. We'll do
992 * that only when the buffer fills up or after writing an entire
993 * message.
994 */
995 }
996 }
997
998 *bytes_written = sent;
999 return SHM_MQ_SUCCESS;
1000}
1001
1002/*
1003 * Wait until at least *nbytesp bytes are available to be read from the
1004 * shared message queue, or until the buffer wraps around. If the queue is
1005 * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait
1006 * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set
1007 * to the location at which data bytes can be read, *nbytesp is set to the
1008 * number of bytes which can be read at that address, and the return value
1009 * is SHM_MQ_SUCCESS.
1010 */
1011static shm_mq_result
1012shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
1013 Size *nbytesp, void **datap)
1014{
1015 shm_mq *mq = mqh->mqh_queue;
1016 Size ringsize = mq->mq_ring_size;
1017 uint64 used;
1018 uint64 written;
1019
1020 for (;;)
1021 {
1022 Size offset;
1023 uint64 read;
1024
1025 /* Get bytes written, so we can compute what's available to read. */
1026 written = pg_atomic_read_u64(&mq->mq_bytes_written);
1027
1028 /*
1029 * Get bytes read. Include bytes we could consume but have not yet
1030 * consumed.
1031 */
1032 read = pg_atomic_read_u64(&mq->mq_bytes_read) +
1033 mqh->mqh_consume_pending;
1034 used = written - read;
1035 Assert(used <= ringsize);
1036 offset = read % (uint64) ringsize;
1037
1038 /* If we have enough data or buffer has wrapped, we're done. */
1039 if (used >= bytes_needed || offset + used >= ringsize)
1040 {
1041 *nbytesp = Min(used, ringsize - offset);
1042 *datap = &mq->mq_ring[mq->mq_ring_offset + offset];
1043
1044 /*
1045 * Separate the read of mq_bytes_written, above, from caller's
1046 * attempt to read the data itself. Pairs with the barrier in
1047 * shm_mq_inc_bytes_written.
1048 */
1049 pg_read_barrier();
1050 return SHM_MQ_SUCCESS;
1051 }
1052
1053 /*
1054 * Fall out before waiting if the queue has been detached.
1055 *
1056 * Note that we don't check for this until *after* considering whether
1057 * the data already available is enough, since the receiver can finish
1058 * receiving a message stored in the buffer even after the sender has
1059 * detached.
1060 */
1061 if (mq->mq_detached)
1062 {
1063 /*
1064 * If the writer advanced mq_bytes_written and then set
1065 * mq_detached, we might not have read the final value of
1066 * mq_bytes_written above. Insert a read barrier and then check
1067 * again if mq_bytes_written has advanced.
1068 */
1069 pg_read_barrier();
1070 if (written != pg_atomic_read_u64(&mq->mq_bytes_written))
1071 continue;
1072
1073 return SHM_MQ_DETACHED;
1074 }
1075
1076 /*
1077 * We didn't get enough data to satisfy the request, so mark any data
1078 * previously-consumed as read to make more buffer space.
1079 */
1080 if (mqh->mqh_consume_pending > 0)
1081 {
1082 shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
1083 mqh->mqh_consume_pending = 0;
1084 }
1085
1086 /* Skip manipulation of our latch if nowait = true. */
1087 if (nowait)
1088 return SHM_MQ_WOULD_BLOCK;
1089
1090 /*
1091 * Wait for our latch to be set. It might already be set for some
1092 * unrelated reason, but that'll just result in one extra trip through
1093 * the loop. It's worth it to avoid resetting the latch at top of
1094 * loop, because setting an already-set latch is much cheaper than
1095 * setting one that has been reset.
1096 */
1097 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1098 WAIT_EVENT_MQ_RECEIVE);
1099
1100 /* Reset the latch so we don't spin. */
1101 ResetLatch(MyLatch);
1102
1103 /* An interrupt may have occurred while we were waiting. */
1104 CHECK_FOR_INTERRUPTS();
1105 }
1106}
1107
1108/*
1109 * Test whether a counterparty who may not even be alive yet is definitely gone.
1110 */
1111static bool
1112shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle)
1113{
1114 pid_t pid;
1115
1116 /* If the queue has been detached, counterparty is definitely gone. */
1117 if (mq->mq_detached)
1118 return true;
1119
1120 /* If there's a handle, check worker status. */
1121 if (handle != NULL)
1122 {
1123 BgwHandleStatus status;
1124
1125 /* Check for unexpected worker death. */
1126 status = GetBackgroundWorkerPid(handle, &pid);
1127 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1128 {
1129 /* Mark it detached, just to make it official. */
1130 mq->mq_detached = true;
1131 return true;
1132 }
1133 }
1134
1135 /* Counterparty is not definitively gone. */
1136 return false;
1137}
1138
1139/*
1140 * This is used when a process is waiting for its counterpart to attach to the
1141 * queue. We exit when the other process attaches as expected, or, if
1142 * handle != NULL, when the referenced background process or the postmaster
1143 * dies. Note that if handle == NULL, and the process fails to attach, we'll
1144 * potentially get stuck here forever waiting for a process that may never
1145 * start. We do check for interrupts, though.
1146 *
1147 * ptr is a pointer to the memory address that we're expecting to become
1148 * non-NULL when our counterpart attaches to the queue.
1149 */
1150static bool
1151shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle)
1152{
1153 bool result = false;
1154
1155 for (;;)
1156 {
1157 BgwHandleStatus status;
1158 pid_t pid;
1159
1160 /* Acquire the lock just long enough to check the pointer. */
1161 SpinLockAcquire(&mq->mq_mutex);
1162 result = (*ptr != NULL);
1163 SpinLockRelease(&mq->mq_mutex);
1164
1165 /* Fail if detached; else succeed if initialized. */
1166 if (mq->mq_detached)
1167 {
1168 result = false;
1169 break;
1170 }
1171 if (result)
1172 break;
1173
1174 if (handle != NULL)
1175 {
1176 /* Check for unexpected worker death. */
1177 status = GetBackgroundWorkerPid(handle, &pid);
1178 if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
1179 {
1180 result = false;
1181 break;
1182 }
1183 }
1184
1185 /* Wait to be signalled. */
1186 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
1187 WAIT_EVENT_MQ_INTERNAL);
1188
1189 /* Reset the latch so we don't spin. */
1190 ResetLatch(MyLatch);
1191
1192 /* An interrupt may have occurred while we were waiting. */
1193 CHECK_FOR_INTERRUPTS();
1194 }
1195
1196 return result;
1197}
1198
1199/*
1200 * Increment the number of bytes read.
1201 */
1202static void
1203shm_mq_inc_bytes_read(shm_mq *mq, Size n)
1204{
1205 PGPROC *sender;
1206
1207 /*
1208 * Separate prior reads of mq_ring from the increment of mq_bytes_read
1209 * which follows. This pairs with the full barrier in
1210 * shm_mq_send_bytes(). We only need a read barrier here because the
1211 * increment of mq_bytes_read is actually a read followed by a dependent
1212 * write.
1213 */
1214 pg_read_barrier();
1215
1216 /*
1217 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1218 * else can be changing this value. This method should be cheaper.
1219 */
1220 pg_atomic_write_u64(&mq->mq_bytes_read,
1221 pg_atomic_read_u64(&mq->mq_bytes_read) + n);
1222
1223 /*
1224 * We shouldn't have any bytes to read without a sender, so we can read
1225 * mq_sender here without a lock. Once it's initialized, it can't change.
1226 */
1227 sender = mq->mq_sender;
1228 Assert(sender != NULL);
1229 SetLatch(&sender->procLatch);
1230}
1231
1232/*
1233 * Increment the number of bytes written.
1234 */
1235static void
1236shm_mq_inc_bytes_written(shm_mq *mq, Size n)
1237{
1238 /*
1239 * Separate prior reads of mq_ring from the write of mq_bytes_written
1240 * which we're about to do. Pairs with the read barrier found in
1241 * shm_mq_receive_bytes.
1242 */
1243 pg_write_barrier();
1244
1245 /*
1246 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
1247 * else can be changing this value. This method avoids taking the bus
1248 * lock unnecessarily.
1249 */
1250 pg_atomic_write_u64(&mq->mq_bytes_written,
1251 pg_atomic_read_u64(&mq->mq_bytes_written) + n);
1252}
1253
1254/* Shim for on_dsm_callback. */
1255static void
1256shm_mq_detach_callback(dsm_segment *seg, Datum arg)
1257{
1258 shm_mq *mq = (shm_mq *) DatumGetPointer(arg);
1259
1260 shm_mq_detach_internal(mq);
1261}
1262