1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * shm_mq.c |
4 | * single-reader, single-writer shared memory message queue |
5 | * |
6 | * Both the sender and the receiver must have a PGPROC; their respective |
7 | * process latches are used for synchronization. Only the sender may send, |
8 | * and only the receiver may receive. This is intended to allow a user |
9 | * backend to communicate with worker backends that it has registered. |
10 | * |
11 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
12 | * Portions Copyright (c) 1994, Regents of the University of California |
13 | * |
14 | * src/backend/storage/ipc/shm_mq.c |
15 | * |
16 | *------------------------------------------------------------------------- |
17 | */ |
18 | |
19 | #include "postgres.h" |
20 | |
21 | #include "miscadmin.h" |
22 | #include "pgstat.h" |
23 | #include "postmaster/bgworker.h" |
24 | #include "storage/procsignal.h" |
25 | #include "storage/shm_mq.h" |
26 | #include "storage/spin.h" |
27 | |
28 | /* |
29 | * This structure represents the actual queue, stored in shared memory. |
30 | * |
31 | * Some notes on synchronization: |
32 | * |
33 | * mq_receiver and mq_bytes_read can only be changed by the receiver; and |
34 | * mq_sender and mq_bytes_written can only be changed by the sender. |
35 | * mq_receiver and mq_sender are protected by mq_mutex, although, importantly, |
36 | * they cannot change once set, and thus may be read without a lock once this |
37 | * is known to be the case. |
38 | * |
39 | * mq_bytes_read and mq_bytes_written are not protected by the mutex. Instead, |
40 | * they are written atomically using 8 byte loads and stores. Memory barriers |
41 | * must be carefully used to synchronize reads and writes of these values with |
42 | * reads and writes of the actual data in mq_ring. |
43 | * |
44 | * mq_detached needs no locking. It can be set by either the sender or the |
45 | * receiver, but only ever from false to true, so redundant writes don't |
46 | * matter. It is important that if we set mq_detached and then set the |
47 | * counterparty's latch, the counterparty must be certain to see the change |
48 | * after waking up. Since SetLatch begins with a memory barrier and ResetLatch |
49 | * ends with one, this should be OK. |
50 | * |
51 | * mq_ring_size and mq_ring_offset never change after initialization, and |
52 | * can therefore be read without the lock. |
53 | * |
54 | * Importantly, mq_ring can be safely read and written without a lock. |
55 | * At any given time, the difference between mq_bytes_read and |
56 | * mq_bytes_written defines the number of bytes within mq_ring that contain |
57 | * unread data, and mq_bytes_read defines the position where those bytes |
58 | * begin. The sender can increase the number of unread bytes at any time, |
59 | * but only the receiver can give license to overwrite those bytes, by |
60 | * incrementing mq_bytes_read. Therefore, it's safe for the receiver to read |
61 | * the unread bytes it knows to be present without the lock. Conversely, |
62 | * the sender can write to the unused portion of the ring buffer without |
63 | * the lock, because nobody else can be reading or writing those bytes. The |
64 | * receiver could be making more bytes unused by incrementing mq_bytes_read, |
65 | * but that's OK. Note that it would be unsafe for the receiver to read any |
66 | * data it's already marked as read, or to write any data; and it would be |
67 | * unsafe for the sender to reread any data after incrementing |
68 | * mq_bytes_written, but fortunately there's no need for any of that. |
69 | */ |
70 | struct shm_mq |
71 | { |
72 | slock_t mq_mutex; |
73 | PGPROC *mq_receiver; |
74 | PGPROC *mq_sender; |
75 | pg_atomic_uint64 mq_bytes_read; |
76 | pg_atomic_uint64 mq_bytes_written; |
77 | Size mq_ring_size; |
78 | bool mq_detached; |
79 | uint8 mq_ring_offset; |
80 | char mq_ring[FLEXIBLE_ARRAY_MEMBER]; |
81 | }; |
82 | |
83 | /* |
84 | * This structure is a backend-private handle for access to a queue. |
85 | * |
86 | * mqh_queue is a pointer to the queue we've attached, and mqh_segment is |
87 | * an optional pointer to the dynamic shared memory segment that contains it. |
88 | * (If mqh_segment is provided, we register an on_dsm_detach callback to |
89 | * make sure we detach from the queue before detaching from DSM.) |
90 | * |
91 | * If this queue is intended to connect the current process with a background |
92 | * worker that started it, the user can pass a pointer to the worker handle |
93 | * to shm_mq_attach(), and we'll store it in mqh_handle. The point of this |
94 | * is to allow us to begin sending to or receiving from that queue before the |
95 | * process we'll be communicating with has even been started. If it fails |
96 | * to start, the handle will allow us to notice that and fail cleanly, rather |
97 | * than waiting forever; see shm_mq_wait_internal. This is mostly useful in |
98 | * simple cases - e.g. where there are just 2 processes communicating; in |
99 | * more complex scenarios, every process may not have a BackgroundWorkerHandle |
100 | * available, or may need to watch for the failure of more than one other |
101 | * process at a time. |
102 | * |
103 | * When a message exists as a contiguous chunk of bytes in the queue - that is, |
104 | * it is smaller than the size of the ring buffer and does not wrap around |
105 | * the end - we return the message to the caller as a pointer into the buffer. |
106 | * For messages that are larger or happen to wrap, we reassemble the message |
107 | * locally by copying the chunks into a backend-local buffer. mqh_buffer is |
108 | * the buffer, and mqh_buflen is the number of bytes allocated for it. |
109 | * |
110 | * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete |
111 | * are used to track the state of non-blocking operations. When the caller |
112 | * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they |
113 | * are expected to retry the call at a later time with the same argument; |
114 | * we need to retain enough state to pick up where we left off. |
115 | * mqh_length_word_complete tracks whether we are done sending or receiving |
116 | * (whichever we're doing) the entire length word. mqh_partial_bytes tracks |
117 | * the number of bytes read or written for either the length word or the |
118 | * message itself, and mqh_expected_bytes - which is used only for reads - |
119 | * tracks the expected total size of the payload. |
120 | * |
121 | * mqh_counterparty_attached tracks whether we know the counterparty to have |
122 | * attached to the queue at some previous point. This lets us avoid some |
123 | * mutex acquisitions. |
124 | * |
125 | * mqh_context is the memory context in effect at the time we attached to |
126 | * the shm_mq. The shm_mq_handle itself is allocated in this context, and |
127 | * we make sure any other allocations we do happen in this context as well, |
128 | * to avoid nasty surprises. |
129 | */ |
130 | struct shm_mq_handle |
131 | { |
132 | shm_mq *mqh_queue; |
133 | dsm_segment *mqh_segment; |
134 | BackgroundWorkerHandle *mqh_handle; |
135 | char *mqh_buffer; |
136 | Size mqh_buflen; |
137 | Size mqh_consume_pending; |
138 | Size mqh_partial_bytes; |
139 | Size mqh_expected_bytes; |
140 | bool mqh_length_word_complete; |
141 | bool mqh_counterparty_attached; |
142 | MemoryContext mqh_context; |
143 | }; |
144 | |
145 | static void shm_mq_detach_internal(shm_mq *mq); |
146 | static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, |
147 | const void *data, bool nowait, Size *bytes_written); |
148 | static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh, |
149 | Size bytes_needed, bool nowait, Size *nbytesp, |
150 | void **datap); |
151 | static bool shm_mq_counterparty_gone(shm_mq *mq, |
152 | BackgroundWorkerHandle *handle); |
153 | static bool shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, |
154 | BackgroundWorkerHandle *handle); |
155 | static void shm_mq_inc_bytes_read(shm_mq *mq, Size n); |
156 | static void shm_mq_inc_bytes_written(shm_mq *mq, Size n); |
157 | static void shm_mq_detach_callback(dsm_segment *seg, Datum arg); |
158 | |
159 | /* Minimum queue size is enough for header and at least one chunk of data. */ |
160 | const Size shm_mq_minimum_size = |
161 | MAXALIGN(offsetof(shm_mq, mq_ring)) + MAXIMUM_ALIGNOF; |
162 | |
163 | #define MQH_INITIAL_BUFSIZE 8192 |
164 | |
165 | /* |
166 | * Initialize a new shared message queue. |
167 | */ |
168 | shm_mq * |
169 | shm_mq_create(void *address, Size size) |
170 | { |
171 | shm_mq *mq = address; |
172 | Size data_offset = MAXALIGN(offsetof(shm_mq, mq_ring)); |
173 | |
174 | /* If the size isn't MAXALIGN'd, just discard the odd bytes. */ |
175 | size = MAXALIGN_DOWN(size); |
176 | |
177 | /* Queue size must be large enough to hold some data. */ |
178 | Assert(size > data_offset); |
179 | |
180 | /* Initialize queue header. */ |
181 | SpinLockInit(&mq->mq_mutex); |
182 | mq->mq_receiver = NULL; |
183 | mq->mq_sender = NULL; |
184 | pg_atomic_init_u64(&mq->mq_bytes_read, 0); |
185 | pg_atomic_init_u64(&mq->mq_bytes_written, 0); |
186 | mq->mq_ring_size = size - data_offset; |
187 | mq->mq_detached = false; |
188 | mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring); |
189 | |
190 | return mq; |
191 | } |
192 | |
193 | /* |
194 | * Set the identity of the process that will receive from a shared message |
195 | * queue. |
196 | */ |
197 | void |
198 | shm_mq_set_receiver(shm_mq *mq, PGPROC *proc) |
199 | { |
200 | PGPROC *sender; |
201 | |
202 | SpinLockAcquire(&mq->mq_mutex); |
203 | Assert(mq->mq_receiver == NULL); |
204 | mq->mq_receiver = proc; |
205 | sender = mq->mq_sender; |
206 | SpinLockRelease(&mq->mq_mutex); |
207 | |
208 | if (sender != NULL) |
209 | SetLatch(&sender->procLatch); |
210 | } |
211 | |
212 | /* |
213 | * Set the identity of the process that will send to a shared message queue. |
214 | */ |
215 | void |
216 | shm_mq_set_sender(shm_mq *mq, PGPROC *proc) |
217 | { |
218 | PGPROC *receiver; |
219 | |
220 | SpinLockAcquire(&mq->mq_mutex); |
221 | Assert(mq->mq_sender == NULL); |
222 | mq->mq_sender = proc; |
223 | receiver = mq->mq_receiver; |
224 | SpinLockRelease(&mq->mq_mutex); |
225 | |
226 | if (receiver != NULL) |
227 | SetLatch(&receiver->procLatch); |
228 | } |
229 | |
230 | /* |
231 | * Get the configured receiver. |
232 | */ |
233 | PGPROC * |
234 | shm_mq_get_receiver(shm_mq *mq) |
235 | { |
236 | PGPROC *receiver; |
237 | |
238 | SpinLockAcquire(&mq->mq_mutex); |
239 | receiver = mq->mq_receiver; |
240 | SpinLockRelease(&mq->mq_mutex); |
241 | |
242 | return receiver; |
243 | } |
244 | |
245 | /* |
246 | * Get the configured sender. |
247 | */ |
248 | PGPROC * |
249 | shm_mq_get_sender(shm_mq *mq) |
250 | { |
251 | PGPROC *sender; |
252 | |
253 | SpinLockAcquire(&mq->mq_mutex); |
254 | sender = mq->mq_sender; |
255 | SpinLockRelease(&mq->mq_mutex); |
256 | |
257 | return sender; |
258 | } |
259 | |
260 | /* |
261 | * Attach to a shared message queue so we can send or receive messages. |
262 | * |
263 | * The memory context in effect at the time this function is called should |
264 | * be one which will last for at least as long as the message queue itself. |
265 | * We'll allocate the handle in that context, and future allocations that |
266 | * are needed to buffer incoming data will happen in that context as well. |
267 | * |
268 | * If seg != NULL, the queue will be automatically detached when that dynamic |
269 | * shared memory segment is detached. |
270 | * |
271 | * If handle != NULL, the queue can be read or written even before the |
272 | * other process has attached. We'll wait for it to do so if needed. The |
273 | * handle must be for a background worker initialized with bgw_notify_pid |
274 | * equal to our PID. |
275 | * |
276 | * shm_mq_detach() should be called when done. This will free the |
277 | * shm_mq_handle and mark the queue itself as detached, so that our |
278 | * counterpart won't get stuck waiting for us to fill or drain the queue |
279 | * after we've already lost interest. |
280 | */ |
281 | shm_mq_handle * |
282 | shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) |
283 | { |
284 | shm_mq_handle *mqh = palloc(sizeof(shm_mq_handle)); |
285 | |
286 | Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc); |
287 | mqh->mqh_queue = mq; |
288 | mqh->mqh_segment = seg; |
289 | mqh->mqh_handle = handle; |
290 | mqh->mqh_buffer = NULL; |
291 | mqh->mqh_buflen = 0; |
292 | mqh->mqh_consume_pending = 0; |
293 | mqh->mqh_partial_bytes = 0; |
294 | mqh->mqh_expected_bytes = 0; |
295 | mqh->mqh_length_word_complete = false; |
296 | mqh->mqh_counterparty_attached = false; |
297 | mqh->mqh_context = CurrentMemoryContext; |
298 | |
299 | if (seg != NULL) |
300 | on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq)); |
301 | |
302 | return mqh; |
303 | } |
304 | |
305 | /* |
306 | * Associate a BackgroundWorkerHandle with a shm_mq_handle just as if it had |
307 | * been passed to shm_mq_attach. |
308 | */ |
309 | void |
310 | shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle) |
311 | { |
312 | Assert(mqh->mqh_handle == NULL); |
313 | mqh->mqh_handle = handle; |
314 | } |
315 | |
316 | /* |
317 | * Write a message into a shared message queue. |
318 | */ |
319 | shm_mq_result |
320 | shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait) |
321 | { |
322 | shm_mq_iovec iov; |
323 | |
324 | iov.data = data; |
325 | iov.len = nbytes; |
326 | |
327 | return shm_mq_sendv(mqh, &iov, 1, nowait); |
328 | } |
329 | |
330 | /* |
331 | * Write a message into a shared message queue, gathered from multiple |
332 | * addresses. |
333 | * |
334 | * When nowait = false, we'll wait on our process latch when the ring buffer |
335 | * fills up, and then continue writing once the receiver has drained some data. |
336 | * The process latch is reset after each wait. |
337 | * |
338 | * When nowait = true, we do not manipulate the state of the process latch; |
339 | * instead, if the buffer becomes full, we return SHM_MQ_WOULD_BLOCK. In |
340 | * this case, the caller should call this function again, with the same |
341 | * arguments, each time the process latch is set. (Once begun, the sending |
342 | * of a message cannot be aborted except by detaching from the queue; changing |
343 | * the length or payload will corrupt the queue.) |
344 | */ |
345 | shm_mq_result |
346 | shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait) |
347 | { |
348 | shm_mq_result res; |
349 | shm_mq *mq = mqh->mqh_queue; |
350 | PGPROC *receiver; |
351 | Size nbytes = 0; |
352 | Size bytes_written; |
353 | int i; |
354 | int which_iov = 0; |
355 | Size offset; |
356 | |
357 | Assert(mq->mq_sender == MyProc); |
358 | |
359 | /* Compute total size of write. */ |
360 | for (i = 0; i < iovcnt; ++i) |
361 | nbytes += iov[i].len; |
362 | |
363 | /* Try to write, or finish writing, the length word into the buffer. */ |
364 | while (!mqh->mqh_length_word_complete) |
365 | { |
366 | Assert(mqh->mqh_partial_bytes < sizeof(Size)); |
367 | res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, |
368 | ((char *) &nbytes) + mqh->mqh_partial_bytes, |
369 | nowait, &bytes_written); |
370 | |
371 | if (res == SHM_MQ_DETACHED) |
372 | { |
373 | /* Reset state in case caller tries to send another message. */ |
374 | mqh->mqh_partial_bytes = 0; |
375 | mqh->mqh_length_word_complete = false; |
376 | return res; |
377 | } |
378 | mqh->mqh_partial_bytes += bytes_written; |
379 | |
380 | if (mqh->mqh_partial_bytes >= sizeof(Size)) |
381 | { |
382 | Assert(mqh->mqh_partial_bytes == sizeof(Size)); |
383 | |
384 | mqh->mqh_partial_bytes = 0; |
385 | mqh->mqh_length_word_complete = true; |
386 | } |
387 | |
388 | if (res != SHM_MQ_SUCCESS) |
389 | return res; |
390 | |
391 | /* Length word can't be split unless bigger than required alignment. */ |
392 | Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF); |
393 | } |
394 | |
395 | /* Write the actual data bytes into the buffer. */ |
396 | Assert(mqh->mqh_partial_bytes <= nbytes); |
397 | offset = mqh->mqh_partial_bytes; |
398 | do |
399 | { |
400 | Size chunksize; |
401 | |
402 | /* Figure out which bytes need to be sent next. */ |
403 | if (offset >= iov[which_iov].len) |
404 | { |
405 | offset -= iov[which_iov].len; |
406 | ++which_iov; |
407 | if (which_iov >= iovcnt) |
408 | break; |
409 | continue; |
410 | } |
411 | |
412 | /* |
413 | * We want to avoid copying the data if at all possible, but every |
414 | * chunk of bytes we write into the queue has to be MAXALIGN'd, except |
415 | * the last. Thus, if a chunk other than the last one ends on a |
416 | * non-MAXALIGN'd boundary, we have to combine the tail end of its |
417 | * data with data from one or more following chunks until we either |
418 | * reach the last chunk or accumulate a number of bytes which is |
419 | * MAXALIGN'd. |
420 | */ |
421 | if (which_iov + 1 < iovcnt && |
422 | offset + MAXIMUM_ALIGNOF > iov[which_iov].len) |
423 | { |
424 | char tmpbuf[MAXIMUM_ALIGNOF]; |
425 | int j = 0; |
426 | |
427 | for (;;) |
428 | { |
429 | if (offset < iov[which_iov].len) |
430 | { |
431 | tmpbuf[j] = iov[which_iov].data[offset]; |
432 | j++; |
433 | offset++; |
434 | if (j == MAXIMUM_ALIGNOF) |
435 | break; |
436 | } |
437 | else |
438 | { |
439 | offset -= iov[which_iov].len; |
440 | which_iov++; |
441 | if (which_iov >= iovcnt) |
442 | break; |
443 | } |
444 | } |
445 | |
446 | res = shm_mq_send_bytes(mqh, j, tmpbuf, nowait, &bytes_written); |
447 | |
448 | if (res == SHM_MQ_DETACHED) |
449 | { |
450 | /* Reset state in case caller tries to send another message. */ |
451 | mqh->mqh_partial_bytes = 0; |
452 | mqh->mqh_length_word_complete = false; |
453 | return res; |
454 | } |
455 | |
456 | mqh->mqh_partial_bytes += bytes_written; |
457 | if (res != SHM_MQ_SUCCESS) |
458 | return res; |
459 | continue; |
460 | } |
461 | |
462 | /* |
463 | * If this is the last chunk, we can write all the data, even if it |
464 | * isn't a multiple of MAXIMUM_ALIGNOF. Otherwise, we need to |
465 | * MAXALIGN_DOWN the write size. |
466 | */ |
467 | chunksize = iov[which_iov].len - offset; |
468 | if (which_iov + 1 < iovcnt) |
469 | chunksize = MAXALIGN_DOWN(chunksize); |
470 | res = shm_mq_send_bytes(mqh, chunksize, &iov[which_iov].data[offset], |
471 | nowait, &bytes_written); |
472 | |
473 | if (res == SHM_MQ_DETACHED) |
474 | { |
475 | /* Reset state in case caller tries to send another message. */ |
476 | mqh->mqh_length_word_complete = false; |
477 | mqh->mqh_partial_bytes = 0; |
478 | return res; |
479 | } |
480 | |
481 | mqh->mqh_partial_bytes += bytes_written; |
482 | offset += bytes_written; |
483 | if (res != SHM_MQ_SUCCESS) |
484 | return res; |
485 | } while (mqh->mqh_partial_bytes < nbytes); |
486 | |
487 | /* Reset for next message. */ |
488 | mqh->mqh_partial_bytes = 0; |
489 | mqh->mqh_length_word_complete = false; |
490 | |
491 | /* If queue has been detached, let caller know. */ |
492 | if (mq->mq_detached) |
493 | return SHM_MQ_DETACHED; |
494 | |
495 | /* |
496 | * If the counterparty is known to have attached, we can read mq_receiver |
497 | * without acquiring the spinlock and assume it isn't NULL. Otherwise, |
498 | * more caution is needed. |
499 | */ |
500 | if (mqh->mqh_counterparty_attached) |
501 | receiver = mq->mq_receiver; |
502 | else |
503 | { |
504 | SpinLockAcquire(&mq->mq_mutex); |
505 | receiver = mq->mq_receiver; |
506 | SpinLockRelease(&mq->mq_mutex); |
507 | if (receiver == NULL) |
508 | return SHM_MQ_SUCCESS; |
509 | mqh->mqh_counterparty_attached = true; |
510 | } |
511 | |
512 | /* Notify receiver of the newly-written data, and return. */ |
513 | SetLatch(&receiver->procLatch); |
514 | return SHM_MQ_SUCCESS; |
515 | } |
516 | |
517 | /* |
518 | * Receive a message from a shared message queue. |
519 | * |
520 | * We set *nbytes to the message length and *data to point to the message |
521 | * payload. If the entire message exists in the queue as a single, |
522 | * contiguous chunk, *data will point directly into shared memory; otherwise, |
523 | * it will point to a temporary buffer. This mostly avoids data copying in |
524 | * the hoped-for case where messages are short compared to the buffer size, |
525 | * while still allowing longer messages. In either case, the return value |
526 | * remains valid until the next receive operation is performed on the queue. |
527 | * |
528 | * When nowait = false, we'll wait on our process latch when the ring buffer |
529 | * is empty and we have not yet received a full message. The sender will |
530 | * set our process latch after more data has been written, and we'll resume |
531 | * processing. Each call will therefore return a complete message |
532 | * (unless the sender detaches the queue). |
533 | * |
534 | * When nowait = true, we do not manipulate the state of the process latch; |
535 | * instead, whenever the buffer is empty and we need to read from it, we |
536 | * return SHM_MQ_WOULD_BLOCK. In this case, the caller should call this |
537 | * function again after the process latch has been set. |
538 | */ |
539 | shm_mq_result |
540 | shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait) |
541 | { |
542 | shm_mq *mq = mqh->mqh_queue; |
543 | shm_mq_result res; |
544 | Size rb = 0; |
545 | Size nbytes; |
546 | void *rawdata; |
547 | |
548 | Assert(mq->mq_receiver == MyProc); |
549 | |
550 | /* We can't receive data until the sender has attached. */ |
551 | if (!mqh->mqh_counterparty_attached) |
552 | { |
553 | if (nowait) |
554 | { |
555 | int counterparty_gone; |
556 | |
557 | /* |
558 | * We shouldn't return at this point at all unless the sender |
559 | * hasn't attached yet. However, the correct return value depends |
560 | * on whether the sender is still attached. If we first test |
561 | * whether the sender has ever attached and then test whether the |
562 | * sender has detached, there's a race condition: a sender that |
563 | * attaches and detaches very quickly might fool us into thinking |
564 | * the sender never attached at all. So, test whether our |
565 | * counterparty is definitively gone first, and only afterwards |
566 | * check whether the sender ever attached in the first place. |
567 | */ |
568 | counterparty_gone = shm_mq_counterparty_gone(mq, mqh->mqh_handle); |
569 | if (shm_mq_get_sender(mq) == NULL) |
570 | { |
571 | if (counterparty_gone) |
572 | return SHM_MQ_DETACHED; |
573 | else |
574 | return SHM_MQ_WOULD_BLOCK; |
575 | } |
576 | } |
577 | else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle) |
578 | && shm_mq_get_sender(mq) == NULL) |
579 | { |
580 | mq->mq_detached = true; |
581 | return SHM_MQ_DETACHED; |
582 | } |
583 | mqh->mqh_counterparty_attached = true; |
584 | } |
585 | |
586 | /* |
587 | * If we've consumed an amount of data greater than 1/4th of the ring |
588 | * size, mark it consumed in shared memory. We try to avoid doing this |
589 | * unnecessarily when only a small amount of data has been consumed, |
590 | * because SetLatch() is fairly expensive and we don't want to do it too |
591 | * often. |
592 | */ |
593 | if (mqh->mqh_consume_pending > mq->mq_ring_size / 4) |
594 | { |
595 | shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); |
596 | mqh->mqh_consume_pending = 0; |
597 | } |
598 | |
599 | /* Try to read, or finish reading, the length word from the buffer. */ |
600 | while (!mqh->mqh_length_word_complete) |
601 | { |
602 | /* Try to receive the message length word. */ |
603 | Assert(mqh->mqh_partial_bytes < sizeof(Size)); |
604 | res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes, |
605 | nowait, &rb, &rawdata); |
606 | if (res != SHM_MQ_SUCCESS) |
607 | return res; |
608 | |
609 | /* |
610 | * Hopefully, we'll receive the entire message length word at once. |
611 | * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over |
612 | * multiple reads. |
613 | */ |
614 | if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size)) |
615 | { |
616 | Size needed; |
617 | |
618 | nbytes = *(Size *) rawdata; |
619 | |
620 | /* If we've already got the whole message, we're done. */ |
621 | needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes); |
622 | if (rb >= needed) |
623 | { |
624 | mqh->mqh_consume_pending += needed; |
625 | *nbytesp = nbytes; |
626 | *datap = ((char *) rawdata) + MAXALIGN(sizeof(Size)); |
627 | return SHM_MQ_SUCCESS; |
628 | } |
629 | |
630 | /* |
631 | * We don't have the whole message, but we at least have the whole |
632 | * length word. |
633 | */ |
634 | mqh->mqh_expected_bytes = nbytes; |
635 | mqh->mqh_length_word_complete = true; |
636 | mqh->mqh_consume_pending += MAXALIGN(sizeof(Size)); |
637 | rb -= MAXALIGN(sizeof(Size)); |
638 | } |
639 | else |
640 | { |
641 | Size lengthbytes; |
642 | |
643 | /* Can't be split unless bigger than required alignment. */ |
644 | Assert(sizeof(Size) > MAXIMUM_ALIGNOF); |
645 | |
646 | /* Message word is split; need buffer to reassemble. */ |
647 | if (mqh->mqh_buffer == NULL) |
648 | { |
649 | mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, |
650 | MQH_INITIAL_BUFSIZE); |
651 | mqh->mqh_buflen = MQH_INITIAL_BUFSIZE; |
652 | } |
653 | Assert(mqh->mqh_buflen >= sizeof(Size)); |
654 | |
655 | /* Copy partial length word; remember to consume it. */ |
656 | if (mqh->mqh_partial_bytes + rb > sizeof(Size)) |
657 | lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes; |
658 | else |
659 | lengthbytes = rb; |
660 | memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, |
661 | lengthbytes); |
662 | mqh->mqh_partial_bytes += lengthbytes; |
663 | mqh->mqh_consume_pending += MAXALIGN(lengthbytes); |
664 | rb -= lengthbytes; |
665 | |
666 | /* If we now have the whole word, we're ready to read payload. */ |
667 | if (mqh->mqh_partial_bytes >= sizeof(Size)) |
668 | { |
669 | Assert(mqh->mqh_partial_bytes == sizeof(Size)); |
670 | mqh->mqh_expected_bytes = *(Size *) mqh->mqh_buffer; |
671 | mqh->mqh_length_word_complete = true; |
672 | mqh->mqh_partial_bytes = 0; |
673 | } |
674 | } |
675 | } |
676 | nbytes = mqh->mqh_expected_bytes; |
677 | |
678 | if (mqh->mqh_partial_bytes == 0) |
679 | { |
680 | /* |
681 | * Try to obtain the whole message in a single chunk. If this works, |
682 | * we need not copy the data and can return a pointer directly into |
683 | * shared memory. |
684 | */ |
685 | res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata); |
686 | if (res != SHM_MQ_SUCCESS) |
687 | return res; |
688 | if (rb >= nbytes) |
689 | { |
690 | mqh->mqh_length_word_complete = false; |
691 | mqh->mqh_consume_pending += MAXALIGN(nbytes); |
692 | *nbytesp = nbytes; |
693 | *datap = rawdata; |
694 | return SHM_MQ_SUCCESS; |
695 | } |
696 | |
697 | /* |
698 | * The message has wrapped the buffer. We'll need to copy it in order |
699 | * to return it to the client in one chunk. First, make sure we have |
700 | * a large enough buffer available. |
701 | */ |
702 | if (mqh->mqh_buflen < nbytes) |
703 | { |
704 | Size newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE); |
705 | |
706 | while (newbuflen < nbytes) |
707 | newbuflen *= 2; |
708 | |
709 | if (mqh->mqh_buffer != NULL) |
710 | { |
711 | pfree(mqh->mqh_buffer); |
712 | mqh->mqh_buffer = NULL; |
713 | mqh->mqh_buflen = 0; |
714 | } |
715 | mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context, newbuflen); |
716 | mqh->mqh_buflen = newbuflen; |
717 | } |
718 | } |
719 | |
720 | /* Loop until we've copied the entire message. */ |
721 | for (;;) |
722 | { |
723 | Size still_needed; |
724 | |
725 | /* Copy as much as we can. */ |
726 | Assert(mqh->mqh_partial_bytes + rb <= nbytes); |
727 | memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb); |
728 | mqh->mqh_partial_bytes += rb; |
729 | |
730 | /* |
731 | * Update count of bytes that can be consumed, accounting for |
732 | * alignment padding. Note that this will never actually insert any |
733 | * padding except at the end of a message, because the buffer size is |
734 | * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well. |
735 | */ |
736 | Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb)); |
737 | mqh->mqh_consume_pending += MAXALIGN(rb); |
738 | |
739 | /* If we got all the data, exit the loop. */ |
740 | if (mqh->mqh_partial_bytes >= nbytes) |
741 | break; |
742 | |
743 | /* Wait for some more data. */ |
744 | still_needed = nbytes - mqh->mqh_partial_bytes; |
745 | res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata); |
746 | if (res != SHM_MQ_SUCCESS) |
747 | return res; |
748 | if (rb > still_needed) |
749 | rb = still_needed; |
750 | } |
751 | |
752 | /* Return the complete message, and reset for next message. */ |
753 | *nbytesp = nbytes; |
754 | *datap = mqh->mqh_buffer; |
755 | mqh->mqh_length_word_complete = false; |
756 | mqh->mqh_partial_bytes = 0; |
757 | return SHM_MQ_SUCCESS; |
758 | } |
759 | |
760 | /* |
761 | * Wait for the other process that's supposed to use this queue to attach |
762 | * to it. |
763 | * |
764 | * The return value is SHM_MQ_DETACHED if the worker has already detached or |
765 | * if it dies; it is SHM_MQ_SUCCESS if we detect that the worker has attached. |
766 | * Note that we will only be able to detect that the worker has died before |
767 | * attaching if a background worker handle was passed to shm_mq_attach(). |
768 | */ |
769 | shm_mq_result |
770 | shm_mq_wait_for_attach(shm_mq_handle *mqh) |
771 | { |
772 | shm_mq *mq = mqh->mqh_queue; |
773 | PGPROC **victim; |
774 | |
775 | if (shm_mq_get_receiver(mq) == MyProc) |
776 | victim = &mq->mq_sender; |
777 | else |
778 | { |
779 | Assert(shm_mq_get_sender(mq) == MyProc); |
780 | victim = &mq->mq_receiver; |
781 | } |
782 | |
783 | if (shm_mq_wait_internal(mq, victim, mqh->mqh_handle)) |
784 | return SHM_MQ_SUCCESS; |
785 | else |
786 | return SHM_MQ_DETACHED; |
787 | } |
788 | |
789 | /* |
790 | * Detach from a shared message queue, and destroy the shm_mq_handle. |
791 | */ |
792 | void |
793 | shm_mq_detach(shm_mq_handle *mqh) |
794 | { |
795 | /* Notify counterparty that we're outta here. */ |
796 | shm_mq_detach_internal(mqh->mqh_queue); |
797 | |
798 | /* Cancel on_dsm_detach callback, if any. */ |
799 | if (mqh->mqh_segment) |
800 | cancel_on_dsm_detach(mqh->mqh_segment, |
801 | shm_mq_detach_callback, |
802 | PointerGetDatum(mqh->mqh_queue)); |
803 | |
804 | /* Release local memory associated with handle. */ |
805 | if (mqh->mqh_buffer != NULL) |
806 | pfree(mqh->mqh_buffer); |
807 | pfree(mqh); |
808 | } |
809 | |
810 | /* |
811 | * Notify counterparty that we're detaching from shared message queue. |
812 | * |
813 | * The purpose of this function is to make sure that the process |
814 | * with which we're communicating doesn't block forever waiting for us to |
815 | * fill or drain the queue once we've lost interest. When the sender |
816 | * detaches, the receiver can read any messages remaining in the queue; |
817 | * further reads will return SHM_MQ_DETACHED. If the receiver detaches, |
818 | * further attempts to send messages will likewise return SHM_MQ_DETACHED. |
819 | * |
820 | * This is separated out from shm_mq_detach() because if the on_dsm_detach |
821 | * callback fires, we only want to do this much. We do not try to touch |
822 | * the local shm_mq_handle, as it may have been pfree'd already. |
823 | */ |
824 | static void |
825 | shm_mq_detach_internal(shm_mq *mq) |
826 | { |
827 | PGPROC *victim; |
828 | |
829 | SpinLockAcquire(&mq->mq_mutex); |
830 | if (mq->mq_sender == MyProc) |
831 | victim = mq->mq_receiver; |
832 | else |
833 | { |
834 | Assert(mq->mq_receiver == MyProc); |
835 | victim = mq->mq_sender; |
836 | } |
837 | mq->mq_detached = true; |
838 | SpinLockRelease(&mq->mq_mutex); |
839 | |
840 | if (victim != NULL) |
841 | SetLatch(&victim->procLatch); |
842 | } |
843 | |
844 | /* |
845 | * Get the shm_mq from handle. |
846 | */ |
847 | shm_mq * |
848 | shm_mq_get_queue(shm_mq_handle *mqh) |
849 | { |
850 | return mqh->mqh_queue; |
851 | } |
852 | |
853 | /* |
854 | * Write bytes into a shared message queue. |
855 | */ |
856 | static shm_mq_result |
857 | shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data, |
858 | bool nowait, Size *bytes_written) |
859 | { |
860 | shm_mq *mq = mqh->mqh_queue; |
861 | Size sent = 0; |
862 | uint64 used; |
863 | Size ringsize = mq->mq_ring_size; |
864 | Size available; |
865 | |
866 | while (sent < nbytes) |
867 | { |
868 | uint64 rb; |
869 | uint64 wb; |
870 | |
871 | /* Compute number of ring buffer bytes used and available. */ |
872 | rb = pg_atomic_read_u64(&mq->mq_bytes_read); |
873 | wb = pg_atomic_read_u64(&mq->mq_bytes_written); |
874 | Assert(wb >= rb); |
875 | used = wb - rb; |
876 | Assert(used <= ringsize); |
877 | available = Min(ringsize - used, nbytes - sent); |
878 | |
879 | /* |
880 | * Bail out if the queue has been detached. Note that we would be in |
881 | * trouble if the compiler decided to cache the value of |
882 | * mq->mq_detached in a register or on the stack across loop |
883 | * iterations. It probably shouldn't do that anyway since we'll |
884 | * always return, call an external function that performs a system |
885 | * call, or reach a memory barrier at some point later in the loop, |
886 | * but just to be sure, insert a compiler barrier here. |
887 | */ |
888 | pg_compiler_barrier(); |
889 | if (mq->mq_detached) |
890 | { |
891 | *bytes_written = sent; |
892 | return SHM_MQ_DETACHED; |
893 | } |
894 | |
895 | if (available == 0 && !mqh->mqh_counterparty_attached) |
896 | { |
897 | /* |
898 | * The queue is full, so if the receiver isn't yet known to be |
899 | * attached, we must wait for that to happen. |
900 | */ |
901 | if (nowait) |
902 | { |
903 | if (shm_mq_counterparty_gone(mq, mqh->mqh_handle)) |
904 | { |
905 | *bytes_written = sent; |
906 | return SHM_MQ_DETACHED; |
907 | } |
908 | if (shm_mq_get_receiver(mq) == NULL) |
909 | { |
910 | *bytes_written = sent; |
911 | return SHM_MQ_WOULD_BLOCK; |
912 | } |
913 | } |
914 | else if (!shm_mq_wait_internal(mq, &mq->mq_receiver, |
915 | mqh->mqh_handle)) |
916 | { |
917 | mq->mq_detached = true; |
918 | *bytes_written = sent; |
919 | return SHM_MQ_DETACHED; |
920 | } |
921 | mqh->mqh_counterparty_attached = true; |
922 | |
923 | /* |
924 | * The receiver may have read some data after attaching, so we |
925 | * must not wait without rechecking the queue state. |
926 | */ |
927 | } |
928 | else if (available == 0) |
929 | { |
930 | /* |
931 | * Since mq->mqh_counterparty_attached is known to be true at this |
932 | * point, mq_receiver has been set, and it can't change once set. |
933 | * Therefore, we can read it without acquiring the spinlock. |
934 | */ |
935 | Assert(mqh->mqh_counterparty_attached); |
936 | SetLatch(&mq->mq_receiver->procLatch); |
937 | |
938 | /* Skip manipulation of our latch if nowait = true. */ |
939 | if (nowait) |
940 | { |
941 | *bytes_written = sent; |
942 | return SHM_MQ_WOULD_BLOCK; |
943 | } |
944 | |
945 | /* |
946 | * Wait for our latch to be set. It might already be set for some |
947 | * unrelated reason, but that'll just result in one extra trip |
948 | * through the loop. It's worth it to avoid resetting the latch |
949 | * at top of loop, because setting an already-set latch is much |
950 | * cheaper than setting one that has been reset. |
951 | */ |
952 | (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
953 | WAIT_EVENT_MQ_SEND); |
954 | |
955 | /* Reset the latch so we don't spin. */ |
956 | ResetLatch(MyLatch); |
957 | |
958 | /* An interrupt may have occurred while we were waiting. */ |
959 | CHECK_FOR_INTERRUPTS(); |
960 | } |
961 | else |
962 | { |
963 | Size offset; |
964 | Size sendnow; |
965 | |
966 | offset = wb % (uint64) ringsize; |
967 | sendnow = Min(available, ringsize - offset); |
968 | |
969 | /* |
970 | * Write as much data as we can via a single memcpy(). Make sure |
971 | * these writes happen after the read of mq_bytes_read, above. |
972 | * This barrier pairs with the one in shm_mq_inc_bytes_read. |
973 | * (Since we're separating the read of mq_bytes_read from a |
974 | * subsequent write to mq_ring, we need a full barrier here.) |
975 | */ |
976 | pg_memory_barrier(); |
977 | memcpy(&mq->mq_ring[mq->mq_ring_offset + offset], |
978 | (char *) data + sent, sendnow); |
979 | sent += sendnow; |
980 | |
981 | /* |
982 | * Update count of bytes written, with alignment padding. Note |
983 | * that this will never actually insert any padding except at the |
984 | * end of a run of bytes, because the buffer size is a multiple of |
985 | * MAXIMUM_ALIGNOF, and each read is as well. |
986 | */ |
987 | Assert(sent == nbytes || sendnow == MAXALIGN(sendnow)); |
988 | shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow)); |
989 | |
990 | /* |
991 | * For efficiency, we don't set the reader's latch here. We'll do |
992 | * that only when the buffer fills up or after writing an entire |
993 | * message. |
994 | */ |
995 | } |
996 | } |
997 | |
998 | *bytes_written = sent; |
999 | return SHM_MQ_SUCCESS; |
1000 | } |
1001 | |
1002 | /* |
1003 | * Wait until at least *nbytesp bytes are available to be read from the |
1004 | * shared message queue, or until the buffer wraps around. If the queue is |
1005 | * detached, returns SHM_MQ_DETACHED. If nowait is specified and a wait |
1006 | * would be required, returns SHM_MQ_WOULD_BLOCK. Otherwise, *datap is set |
1007 | * to the location at which data bytes can be read, *nbytesp is set to the |
1008 | * number of bytes which can be read at that address, and the return value |
1009 | * is SHM_MQ_SUCCESS. |
1010 | */ |
1011 | static shm_mq_result |
1012 | shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait, |
1013 | Size *nbytesp, void **datap) |
1014 | { |
1015 | shm_mq *mq = mqh->mqh_queue; |
1016 | Size ringsize = mq->mq_ring_size; |
1017 | uint64 used; |
1018 | uint64 written; |
1019 | |
1020 | for (;;) |
1021 | { |
1022 | Size offset; |
1023 | uint64 read; |
1024 | |
1025 | /* Get bytes written, so we can compute what's available to read. */ |
1026 | written = pg_atomic_read_u64(&mq->mq_bytes_written); |
1027 | |
1028 | /* |
1029 | * Get bytes read. Include bytes we could consume but have not yet |
1030 | * consumed. |
1031 | */ |
1032 | read = pg_atomic_read_u64(&mq->mq_bytes_read) + |
1033 | mqh->mqh_consume_pending; |
1034 | used = written - read; |
1035 | Assert(used <= ringsize); |
1036 | offset = read % (uint64) ringsize; |
1037 | |
1038 | /* If we have enough data or buffer has wrapped, we're done. */ |
1039 | if (used >= bytes_needed || offset + used >= ringsize) |
1040 | { |
1041 | *nbytesp = Min(used, ringsize - offset); |
1042 | *datap = &mq->mq_ring[mq->mq_ring_offset + offset]; |
1043 | |
1044 | /* |
1045 | * Separate the read of mq_bytes_written, above, from caller's |
1046 | * attempt to read the data itself. Pairs with the barrier in |
1047 | * shm_mq_inc_bytes_written. |
1048 | */ |
1049 | pg_read_barrier(); |
1050 | return SHM_MQ_SUCCESS; |
1051 | } |
1052 | |
1053 | /* |
1054 | * Fall out before waiting if the queue has been detached. |
1055 | * |
1056 | * Note that we don't check for this until *after* considering whether |
1057 | * the data already available is enough, since the receiver can finish |
1058 | * receiving a message stored in the buffer even after the sender has |
1059 | * detached. |
1060 | */ |
1061 | if (mq->mq_detached) |
1062 | { |
1063 | /* |
1064 | * If the writer advanced mq_bytes_written and then set |
1065 | * mq_detached, we might not have read the final value of |
1066 | * mq_bytes_written above. Insert a read barrier and then check |
1067 | * again if mq_bytes_written has advanced. |
1068 | */ |
1069 | pg_read_barrier(); |
1070 | if (written != pg_atomic_read_u64(&mq->mq_bytes_written)) |
1071 | continue; |
1072 | |
1073 | return SHM_MQ_DETACHED; |
1074 | } |
1075 | |
1076 | /* |
1077 | * We didn't get enough data to satisfy the request, so mark any data |
1078 | * previously-consumed as read to make more buffer space. |
1079 | */ |
1080 | if (mqh->mqh_consume_pending > 0) |
1081 | { |
1082 | shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending); |
1083 | mqh->mqh_consume_pending = 0; |
1084 | } |
1085 | |
1086 | /* Skip manipulation of our latch if nowait = true. */ |
1087 | if (nowait) |
1088 | return SHM_MQ_WOULD_BLOCK; |
1089 | |
1090 | /* |
1091 | * Wait for our latch to be set. It might already be set for some |
1092 | * unrelated reason, but that'll just result in one extra trip through |
1093 | * the loop. It's worth it to avoid resetting the latch at top of |
1094 | * loop, because setting an already-set latch is much cheaper than |
1095 | * setting one that has been reset. |
1096 | */ |
1097 | (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
1098 | WAIT_EVENT_MQ_RECEIVE); |
1099 | |
1100 | /* Reset the latch so we don't spin. */ |
1101 | ResetLatch(MyLatch); |
1102 | |
1103 | /* An interrupt may have occurred while we were waiting. */ |
1104 | CHECK_FOR_INTERRUPTS(); |
1105 | } |
1106 | } |
1107 | |
1108 | /* |
1109 | * Test whether a counterparty who may not even be alive yet is definitely gone. |
1110 | */ |
1111 | static bool |
1112 | shm_mq_counterparty_gone(shm_mq *mq, BackgroundWorkerHandle *handle) |
1113 | { |
1114 | pid_t pid; |
1115 | |
1116 | /* If the queue has been detached, counterparty is definitely gone. */ |
1117 | if (mq->mq_detached) |
1118 | return true; |
1119 | |
1120 | /* If there's a handle, check worker status. */ |
1121 | if (handle != NULL) |
1122 | { |
1123 | BgwHandleStatus status; |
1124 | |
1125 | /* Check for unexpected worker death. */ |
1126 | status = GetBackgroundWorkerPid(handle, &pid); |
1127 | if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) |
1128 | { |
1129 | /* Mark it detached, just to make it official. */ |
1130 | mq->mq_detached = true; |
1131 | return true; |
1132 | } |
1133 | } |
1134 | |
1135 | /* Counterparty is not definitively gone. */ |
1136 | return false; |
1137 | } |
1138 | |
1139 | /* |
1140 | * This is used when a process is waiting for its counterpart to attach to the |
1141 | * queue. We exit when the other process attaches as expected, or, if |
1142 | * handle != NULL, when the referenced background process or the postmaster |
1143 | * dies. Note that if handle == NULL, and the process fails to attach, we'll |
1144 | * potentially get stuck here forever waiting for a process that may never |
1145 | * start. We do check for interrupts, though. |
1146 | * |
1147 | * ptr is a pointer to the memory address that we're expecting to become |
1148 | * non-NULL when our counterpart attaches to the queue. |
1149 | */ |
1150 | static bool |
1151 | shm_mq_wait_internal(shm_mq *mq, PGPROC **ptr, BackgroundWorkerHandle *handle) |
1152 | { |
1153 | bool result = false; |
1154 | |
1155 | for (;;) |
1156 | { |
1157 | BgwHandleStatus status; |
1158 | pid_t pid; |
1159 | |
1160 | /* Acquire the lock just long enough to check the pointer. */ |
1161 | SpinLockAcquire(&mq->mq_mutex); |
1162 | result = (*ptr != NULL); |
1163 | SpinLockRelease(&mq->mq_mutex); |
1164 | |
1165 | /* Fail if detached; else succeed if initialized. */ |
1166 | if (mq->mq_detached) |
1167 | { |
1168 | result = false; |
1169 | break; |
1170 | } |
1171 | if (result) |
1172 | break; |
1173 | |
1174 | if (handle != NULL) |
1175 | { |
1176 | /* Check for unexpected worker death. */ |
1177 | status = GetBackgroundWorkerPid(handle, &pid); |
1178 | if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED) |
1179 | { |
1180 | result = false; |
1181 | break; |
1182 | } |
1183 | } |
1184 | |
1185 | /* Wait to be signalled. */ |
1186 | (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
1187 | WAIT_EVENT_MQ_INTERNAL); |
1188 | |
1189 | /* Reset the latch so we don't spin. */ |
1190 | ResetLatch(MyLatch); |
1191 | |
1192 | /* An interrupt may have occurred while we were waiting. */ |
1193 | CHECK_FOR_INTERRUPTS(); |
1194 | } |
1195 | |
1196 | return result; |
1197 | } |
1198 | |
1199 | /* |
1200 | * Increment the number of bytes read. |
1201 | */ |
1202 | static void |
1203 | shm_mq_inc_bytes_read(shm_mq *mq, Size n) |
1204 | { |
1205 | PGPROC *sender; |
1206 | |
1207 | /* |
1208 | * Separate prior reads of mq_ring from the increment of mq_bytes_read |
1209 | * which follows. This pairs with the full barrier in |
1210 | * shm_mq_send_bytes(). We only need a read barrier here because the |
1211 | * increment of mq_bytes_read is actually a read followed by a dependent |
1212 | * write. |
1213 | */ |
1214 | pg_read_barrier(); |
1215 | |
1216 | /* |
1217 | * There's no need to use pg_atomic_fetch_add_u64 here, because nobody |
1218 | * else can be changing this value. This method should be cheaper. |
1219 | */ |
1220 | pg_atomic_write_u64(&mq->mq_bytes_read, |
1221 | pg_atomic_read_u64(&mq->mq_bytes_read) + n); |
1222 | |
1223 | /* |
1224 | * We shouldn't have any bytes to read without a sender, so we can read |
1225 | * mq_sender here without a lock. Once it's initialized, it can't change. |
1226 | */ |
1227 | sender = mq->mq_sender; |
1228 | Assert(sender != NULL); |
1229 | SetLatch(&sender->procLatch); |
1230 | } |
1231 | |
1232 | /* |
1233 | * Increment the number of bytes written. |
1234 | */ |
1235 | static void |
1236 | shm_mq_inc_bytes_written(shm_mq *mq, Size n) |
1237 | { |
1238 | /* |
1239 | * Separate prior reads of mq_ring from the write of mq_bytes_written |
1240 | * which we're about to do. Pairs with the read barrier found in |
1241 | * shm_mq_receive_bytes. |
1242 | */ |
1243 | pg_write_barrier(); |
1244 | |
1245 | /* |
1246 | * There's no need to use pg_atomic_fetch_add_u64 here, because nobody |
1247 | * else can be changing this value. This method avoids taking the bus |
1248 | * lock unnecessarily. |
1249 | */ |
1250 | pg_atomic_write_u64(&mq->mq_bytes_written, |
1251 | pg_atomic_read_u64(&mq->mq_bytes_written) + n); |
1252 | } |
1253 | |
1254 | /* Shim for on_dsm_callback. */ |
1255 | static void |
1256 | shm_mq_detach_callback(dsm_segment *seg, Datum arg) |
1257 | { |
1258 | shm_mq *mq = (shm_mq *) DatumGetPointer(arg); |
1259 | |
1260 | shm_mq_detach_internal(mq); |
1261 | } |
1262 | |