1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * async.c |
4 | * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * IDENTIFICATION |
10 | * src/backend/commands/async.c |
11 | * |
12 | *------------------------------------------------------------------------- |
13 | */ |
14 | |
15 | /*------------------------------------------------------------------------- |
16 | * Async Notification Model as of 9.0: |
17 | * |
18 | * 1. Multiple backends on same machine. Multiple backends listening on |
19 | * several channels. (Channels are also called "conditions" in other |
20 | * parts of the code.) |
21 | * |
22 | * 2. There is one central queue in disk-based storage (directory pg_notify/), |
23 | * with actively-used pages mapped into shared memory by the slru.c module. |
24 | * All notification messages are placed in the queue and later read out |
25 | * by listening backends. |
26 | * |
27 | * There is no central knowledge of which backend listens on which channel; |
28 | * every backend has its own list of interesting channels. |
29 | * |
30 | * Although there is only one queue, notifications are treated as being |
31 | * database-local; this is done by including the sender's database OID |
32 | * in each notification message. Listening backends ignore messages |
33 | * that don't match their database OID. This is important because it |
34 | * ensures senders and receivers have the same database encoding and won't |
35 | * misinterpret non-ASCII text in the channel name or payload string. |
36 | * |
37 | * Since notifications are not expected to survive database crashes, |
38 | * we can simply clean out the pg_notify data at any reboot, and there |
39 | * is no need for WAL support or fsync'ing. |
40 | * |
41 | * 3. Every backend that is listening on at least one channel registers by |
42 | * entering its PID into the array in AsyncQueueControl. It then scans all |
43 | * incoming notifications in the central queue and first compares the |
44 | * database OID of the notification with its own database OID and then |
45 | * compares the notified channel with the list of channels that it listens |
46 | * to. In case there is a match it delivers the notification event to its |
47 | * frontend. Non-matching events are simply skipped. |
48 | * |
49 | * 4. The NOTIFY statement (routine Async_Notify) stores the notification in |
50 | * a backend-local list which will not be processed until transaction end. |
51 | * |
52 | * Duplicate notifications from the same transaction are sent out as one |
53 | * notification only. This is done to save work when for example a trigger |
54 | * on a 2 million row table fires a notification for each row that has been |
55 | * changed. If the application needs to receive every single notification |
56 | * that has been sent, it can easily add some unique string into the extra |
57 | * payload parameter. |
58 | * |
59 | * When the transaction is ready to commit, PreCommit_Notify() adds the |
60 | * pending notifications to the head of the queue. The head pointer of the |
61 | * queue always points to the next free position and a position is just a |
62 | * page number and the offset in that page. This is done before marking the |
63 | * transaction as committed in clog. If we run into problems writing the |
64 | * notifications, we can still call elog(ERROR, ...) and the transaction |
65 | * will roll back. |
66 | * |
67 | * Once we have put all of the notifications into the queue, we return to |
68 | * CommitTransaction() which will then do the actual transaction commit. |
69 | * |
70 | * After commit we are called another time (AtCommit_Notify()). Here we |
71 | * make the actual updates to the effective listen state (listenChannels). |
72 | * |
73 | * Finally, after we are out of the transaction altogether, we check if |
74 | * we need to signal listening backends. In SignalBackends() we scan the |
75 | * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal |
76 | * to every listening backend (we don't know which backend is listening on |
77 | * which channel so we must signal them all). We can exclude backends that |
78 | * are already up to date, though. We don't bother with a self-signal |
79 | * either, but just process the queue directly. |
80 | * |
81 | * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler |
82 | * sets the process's latch, which triggers the event to be processed |
83 | * immediately if this backend is idle (i.e., it is waiting for a frontend |
84 | * command and is not within a transaction block. C.f. |
85 | * ProcessClientReadInterrupt()). Otherwise the handler may only set a |
86 | * flag, which will cause the processing to occur just before we next go |
87 | * idle. |
88 | * |
89 | * Inbound-notify processing consists of reading all of the notifications |
90 | * that have arrived since scanning last time. We read every notification |
91 | * until we reach either a notification from an uncommitted transaction or |
92 | * the head pointer's position. Then we check if we were the laziest |
93 | * backend: if our pointer is set to the same position as the global tail |
94 | * pointer is set, then we move the global tail pointer ahead to where the |
95 | * second-laziest backend is (in general, we take the MIN of the current |
96 | * head position and all active backends' new tail pointers). Whenever we |
97 | * move the global tail pointer we also truncate now-unused pages (i.e., |
98 | * delete files in pg_notify/ that are no longer used). |
99 | * |
100 | * An application that listens on the same channel it notifies will get |
101 | * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, |
102 | * by comparing be_pid in the NOTIFY message to the application's own backend's |
103 | * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the |
104 | * frontend during startup.) The above design guarantees that notifies from |
105 | * other backends will never be missed by ignoring self-notifies. |
106 | * |
107 | * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS) |
108 | * can be varied without affecting anything but performance. The maximum |
109 | * amount of notification data that can be queued at one time is determined |
110 | * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below. |
111 | *------------------------------------------------------------------------- |
112 | */ |
113 | |
114 | #include "postgres.h" |
115 | |
116 | #include <limits.h> |
117 | #include <unistd.h> |
118 | #include <signal.h> |
119 | |
120 | #include "access/parallel.h" |
121 | #include "access/slru.h" |
122 | #include "access/transam.h" |
123 | #include "access/xact.h" |
124 | #include "catalog/pg_database.h" |
125 | #include "commands/async.h" |
126 | #include "funcapi.h" |
127 | #include "libpq/libpq.h" |
128 | #include "libpq/pqformat.h" |
129 | #include "miscadmin.h" |
130 | #include "storage/ipc.h" |
131 | #include "storage/lmgr.h" |
132 | #include "storage/proc.h" |
133 | #include "storage/procarray.h" |
134 | #include "storage/procsignal.h" |
135 | #include "storage/sinval.h" |
136 | #include "tcop/tcopprot.h" |
137 | #include "utils/builtins.h" |
138 | #include "utils/memutils.h" |
139 | #include "utils/ps_status.h" |
140 | #include "utils/snapmgr.h" |
141 | #include "utils/timestamp.h" |
142 | |
143 | |
144 | /* |
145 | * Maximum size of a NOTIFY payload, including terminating NULL. This |
146 | * must be kept small enough so that a notification message fits on one |
147 | * SLRU page. The magic fudge factor here is noncritical as long as it's |
148 | * more than AsyncQueueEntryEmptySize --- we make it significantly bigger |
149 | * than that, so changes in that data structure won't affect user-visible |
150 | * restrictions. |
151 | */ |
152 | #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128) |
153 | |
154 | /* |
155 | * Struct representing an entry in the global notify queue |
156 | * |
157 | * This struct declaration has the maximal length, but in a real queue entry |
158 | * the data area is only big enough for the actual channel and payload strings |
159 | * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible |
160 | * entry size, if both channel and payload strings are empty (but note it |
161 | * doesn't include alignment padding). |
162 | * |
163 | * The "length" field should always be rounded up to the next QUEUEALIGN |
164 | * multiple so that all fields are properly aligned. |
165 | */ |
166 | typedef struct AsyncQueueEntry |
167 | { |
168 | int length; /* total allocated length of entry */ |
169 | Oid dboid; /* sender's database OID */ |
170 | TransactionId xid; /* sender's XID */ |
171 | int32 srcPid; /* sender's PID */ |
172 | char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; |
173 | } AsyncQueueEntry; |
174 | |
175 | /* Currently, no field of AsyncQueueEntry requires more than int alignment */ |
176 | #define QUEUEALIGN(len) INTALIGN(len) |
177 | |
178 | #define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2) |
179 | |
180 | /* |
181 | * Struct describing a queue position, and assorted macros for working with it |
182 | */ |
183 | typedef struct QueuePosition |
184 | { |
185 | int page; /* SLRU page number */ |
186 | int offset; /* byte offset within page */ |
187 | } QueuePosition; |
188 | |
189 | #define QUEUE_POS_PAGE(x) ((x).page) |
190 | #define QUEUE_POS_OFFSET(x) ((x).offset) |
191 | |
192 | #define SET_QUEUE_POS(x,y,z) \ |
193 | do { \ |
194 | (x).page = (y); \ |
195 | (x).offset = (z); \ |
196 | } while (0) |
197 | |
198 | #define QUEUE_POS_EQUAL(x,y) \ |
199 | ((x).page == (y).page && (x).offset == (y).offset) |
200 | |
201 | /* choose logically smaller QueuePosition */ |
202 | #define QUEUE_POS_MIN(x,y) \ |
203 | (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \ |
204 | (x).page != (y).page ? (y) : \ |
205 | (x).offset < (y).offset ? (x) : (y)) |
206 | |
207 | /* choose logically larger QueuePosition */ |
208 | #define QUEUE_POS_MAX(x,y) \ |
209 | (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \ |
210 | (x).page != (y).page ? (x) : \ |
211 | (x).offset > (y).offset ? (x) : (y)) |
212 | |
213 | /* |
214 | * Struct describing a listening backend's status |
215 | */ |
216 | typedef struct QueueBackendStatus |
217 | { |
218 | int32 pid; /* either a PID or InvalidPid */ |
219 | Oid dboid; /* backend's database OID, or InvalidOid */ |
220 | QueuePosition pos; /* backend has read queue up to here */ |
221 | } QueueBackendStatus; |
222 | |
223 | /* |
224 | * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff) |
225 | * |
226 | * The AsyncQueueControl structure is protected by the AsyncQueueLock. |
227 | * |
228 | * When holding the lock in SHARED mode, backends may only inspect their own |
229 | * entries as well as the head and tail pointers. Consequently we can allow a |
230 | * backend to update its own record while holding only SHARED lock (since no |
231 | * other backend will inspect it). |
232 | * |
233 | * When holding the lock in EXCLUSIVE mode, backends can inspect the entries |
234 | * of other backends and also change the head and tail pointers. |
235 | * |
236 | * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers. |
237 | * In order to avoid deadlocks, whenever we need both locks, we always first |
238 | * get AsyncQueueLock and then AsyncCtlLock. |
239 | * |
240 | * Each backend uses the backend[] array entry with index equal to its |
241 | * BackendId (which can range from 1 to MaxBackends). We rely on this to make |
242 | * SendProcSignal fast. |
243 | */ |
244 | typedef struct AsyncQueueControl |
245 | { |
246 | QueuePosition head; /* head points to the next free location */ |
247 | QueuePosition tail; /* the global tail is equivalent to the pos of |
248 | * the "slowest" backend */ |
249 | TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ |
250 | QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; |
251 | /* backend[0] is not used; used entries are from [1] to [MaxBackends] */ |
252 | } AsyncQueueControl; |
253 | |
254 | static AsyncQueueControl *asyncQueueControl; |
255 | |
256 | #define QUEUE_HEAD (asyncQueueControl->head) |
257 | #define QUEUE_TAIL (asyncQueueControl->tail) |
258 | #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid) |
259 | #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) |
260 | #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) |
261 | |
262 | /* |
263 | * The SLRU buffer area through which we access the notification queue |
264 | */ |
265 | static SlruCtlData AsyncCtlData; |
266 | |
267 | #define AsyncCtl (&AsyncCtlData) |
268 | #define QUEUE_PAGESIZE BLCKSZ |
269 | #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ |
270 | |
271 | /* |
272 | * slru.c currently assumes that all filenames are four characters of hex |
273 | * digits. That means that we can use segments 0000 through FFFF. |
274 | * Each segment contains SLRU_PAGES_PER_SEGMENT pages which gives us |
275 | * the pages from 0 to SLRU_PAGES_PER_SEGMENT * 0x10000 - 1. |
276 | * |
277 | * It's of course possible to enhance slru.c, but this gives us so much |
278 | * space already that it doesn't seem worth the trouble. |
279 | * |
280 | * The most data we can have in the queue at a time is QUEUE_MAX_PAGE/2 |
281 | * pages, because more than that would confuse slru.c into thinking there |
282 | * was a wraparound condition. With the default BLCKSZ this means there |
283 | * can be up to 8GB of queued-and-not-read data. |
284 | * |
285 | * Note: it's possible to redefine QUEUE_MAX_PAGE with a smaller multiple of |
286 | * SLRU_PAGES_PER_SEGMENT, for easier testing of queue-full behaviour. |
287 | */ |
288 | #define QUEUE_MAX_PAGE (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1) |
289 | |
290 | /* |
291 | * listenChannels identifies the channels we are actually listening to |
292 | * (ie, have committed a LISTEN on). It is a simple list of channel names, |
293 | * allocated in TopMemoryContext. |
294 | */ |
295 | static List *listenChannels = NIL; /* list of C strings */ |
296 | |
297 | /* |
298 | * State for pending LISTEN/UNLISTEN actions consists of an ordered list of |
299 | * all actions requested in the current transaction. As explained above, |
300 | * we don't actually change listenChannels until we reach transaction commit. |
301 | * |
302 | * The list is kept in CurTransactionContext. In subtransactions, each |
303 | * subtransaction has its own list in its own CurTransactionContext, but |
304 | * successful subtransactions attach their lists to their parent's list. |
305 | * Failed subtransactions simply discard their lists. |
306 | */ |
307 | typedef enum |
308 | { |
309 | LISTEN_LISTEN, |
310 | LISTEN_UNLISTEN, |
311 | LISTEN_UNLISTEN_ALL |
312 | } ListenActionKind; |
313 | |
314 | typedef struct |
315 | { |
316 | ListenActionKind action; |
317 | char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ |
318 | } ListenAction; |
319 | |
320 | static List *pendingActions = NIL; /* list of ListenAction */ |
321 | |
322 | static List *upperPendingActions = NIL; /* list of upper-xact lists */ |
323 | |
324 | /* |
325 | * State for outbound notifies consists of a list of all channels+payloads |
326 | * NOTIFYed in the current transaction. We do not actually perform a NOTIFY |
327 | * until and unless the transaction commits. pendingNotifies is NIL if no |
328 | * NOTIFYs have been done in the current transaction. |
329 | * |
330 | * The list is kept in CurTransactionContext. In subtransactions, each |
331 | * subtransaction has its own list in its own CurTransactionContext, but |
332 | * successful subtransactions attach their lists to their parent's list. |
333 | * Failed subtransactions simply discard their lists. |
334 | * |
335 | * Note: the action and notify lists do not interact within a transaction. |
336 | * In particular, if a transaction does NOTIFY and then LISTEN on the same |
337 | * condition name, it will get a self-notify at commit. This is a bit odd |
338 | * but is consistent with our historical behavior. |
339 | */ |
340 | typedef struct Notification |
341 | { |
342 | char *channel; /* channel name */ |
343 | char *payload; /* payload string (can be empty) */ |
344 | } Notification; |
345 | |
346 | static List *pendingNotifies = NIL; /* list of Notifications */ |
347 | |
348 | static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ |
349 | |
350 | /* |
351 | * Inbound notifications are initially processed by HandleNotifyInterrupt(), |
352 | * called from inside a signal handler. That just sets the |
353 | * notifyInterruptPending flag and sets the process |
354 | * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to |
355 | * actually deal with the interrupt. |
356 | */ |
357 | volatile sig_atomic_t notifyInterruptPending = false; |
358 | |
359 | /* True if we've registered an on_shmem_exit cleanup */ |
360 | static bool unlistenExitRegistered = false; |
361 | |
362 | /* True if we're currently registered as a listener in asyncQueueControl */ |
363 | static bool amRegisteredListener = false; |
364 | |
365 | /* has this backend sent notifications in the current transaction? */ |
366 | static bool backendHasSentNotifications = false; |
367 | |
368 | /* GUC parameter */ |
369 | bool Trace_notify = false; |
370 | |
371 | /* local function prototypes */ |
372 | static bool asyncQueuePagePrecedes(int p, int q); |
373 | static void queue_listen(ListenActionKind action, const char *channel); |
374 | static void Async_UnlistenOnExit(int code, Datum arg); |
375 | static void Exec_ListenPreCommit(void); |
376 | static void Exec_ListenCommit(const char *channel); |
377 | static void Exec_UnlistenCommit(const char *channel); |
378 | static void Exec_UnlistenAllCommit(void); |
379 | static bool IsListeningOn(const char *channel); |
380 | static void asyncQueueUnregister(void); |
381 | static bool asyncQueueIsFull(void); |
382 | static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); |
383 | static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); |
384 | static ListCell *asyncQueueAddEntries(ListCell *nextNotify); |
385 | static double asyncQueueUsage(void); |
386 | static void asyncQueueFillWarning(void); |
387 | static bool SignalBackends(void); |
388 | static void asyncQueueReadAllNotifications(void); |
389 | static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, |
390 | QueuePosition stop, |
391 | char *page_buffer, |
392 | Snapshot snapshot); |
393 | static void asyncQueueAdvanceTail(void); |
394 | static void ProcessIncomingNotify(void); |
395 | static bool AsyncExistsPendingNotify(const char *channel, const char *payload); |
396 | static void ClearPendingActionsAndNotifies(void); |
397 | |
398 | /* |
399 | * We will work on the page range of 0..QUEUE_MAX_PAGE. |
400 | */ |
401 | static bool |
402 | asyncQueuePagePrecedes(int p, int q) |
403 | { |
404 | int diff; |
405 | |
406 | /* |
407 | * We have to compare modulo (QUEUE_MAX_PAGE+1)/2. Both inputs should be |
408 | * in the range 0..QUEUE_MAX_PAGE. |
409 | */ |
410 | Assert(p >= 0 && p <= QUEUE_MAX_PAGE); |
411 | Assert(q >= 0 && q <= QUEUE_MAX_PAGE); |
412 | |
413 | diff = p - q; |
414 | if (diff >= ((QUEUE_MAX_PAGE + 1) / 2)) |
415 | diff -= QUEUE_MAX_PAGE + 1; |
416 | else if (diff < -((QUEUE_MAX_PAGE + 1) / 2)) |
417 | diff += QUEUE_MAX_PAGE + 1; |
418 | return diff < 0; |
419 | } |
420 | |
421 | /* |
422 | * Report space needed for our shared memory area |
423 | */ |
424 | Size |
425 | AsyncShmemSize(void) |
426 | { |
427 | Size size; |
428 | |
429 | /* This had better match AsyncShmemInit */ |
430 | size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); |
431 | size = add_size(size, offsetof(AsyncQueueControl, backend)); |
432 | |
433 | size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0)); |
434 | |
435 | return size; |
436 | } |
437 | |
438 | /* |
439 | * Initialize our shared memory area |
440 | */ |
441 | void |
442 | AsyncShmemInit(void) |
443 | { |
444 | bool found; |
445 | int slotno; |
446 | Size size; |
447 | |
448 | /* |
449 | * Create or attach to the AsyncQueueControl structure. |
450 | * |
451 | * The used entries in the backend[] array run from 1 to MaxBackends; the |
452 | * zero'th entry is unused but must be allocated. |
453 | */ |
454 | size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); |
455 | size = add_size(size, offsetof(AsyncQueueControl, backend)); |
456 | |
457 | asyncQueueControl = (AsyncQueueControl *) |
458 | ShmemInitStruct("Async Queue Control" , size, &found); |
459 | |
460 | if (!found) |
461 | { |
462 | /* First time through, so initialize it */ |
463 | int i; |
464 | |
465 | SET_QUEUE_POS(QUEUE_HEAD, 0, 0); |
466 | SET_QUEUE_POS(QUEUE_TAIL, 0, 0); |
467 | asyncQueueControl->lastQueueFillWarn = 0; |
468 | /* zero'th entry won't be used, but let's initialize it anyway */ |
469 | for (i = 0; i <= MaxBackends; i++) |
470 | { |
471 | QUEUE_BACKEND_PID(i) = InvalidPid; |
472 | QUEUE_BACKEND_DBOID(i) = InvalidOid; |
473 | SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); |
474 | } |
475 | } |
476 | |
477 | /* |
478 | * Set up SLRU management of the pg_notify data. |
479 | */ |
480 | AsyncCtl->PagePrecedes = asyncQueuePagePrecedes; |
481 | SimpleLruInit(AsyncCtl, "async" , NUM_ASYNC_BUFFERS, 0, |
482 | AsyncCtlLock, "pg_notify" , LWTRANCHE_ASYNC_BUFFERS); |
483 | /* Override default assumption that writes should be fsync'd */ |
484 | AsyncCtl->do_fsync = false; |
485 | |
486 | if (!found) |
487 | { |
488 | /* |
489 | * During start or reboot, clean out the pg_notify directory. |
490 | */ |
491 | (void) SlruScanDirectory(AsyncCtl, SlruScanDirCbDeleteAll, NULL); |
492 | |
493 | /* Now initialize page zero to empty */ |
494 | LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); |
495 | slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); |
496 | /* This write is just to verify that pg_notify/ is writable */ |
497 | SimpleLruWritePage(AsyncCtl, slotno); |
498 | LWLockRelease(AsyncCtlLock); |
499 | } |
500 | } |
501 | |
502 | |
503 | /* |
504 | * pg_notify - |
505 | * SQL function to send a notification event |
506 | */ |
507 | Datum |
508 | pg_notify(PG_FUNCTION_ARGS) |
509 | { |
510 | const char *channel; |
511 | const char *payload; |
512 | |
513 | if (PG_ARGISNULL(0)) |
514 | channel = "" ; |
515 | else |
516 | channel = text_to_cstring(PG_GETARG_TEXT_PP(0)); |
517 | |
518 | if (PG_ARGISNULL(1)) |
519 | payload = "" ; |
520 | else |
521 | payload = text_to_cstring(PG_GETARG_TEXT_PP(1)); |
522 | |
523 | /* For NOTIFY as a statement, this is checked in ProcessUtility */ |
524 | PreventCommandDuringRecovery("NOTIFY" ); |
525 | |
526 | Async_Notify(channel, payload); |
527 | |
528 | PG_RETURN_VOID(); |
529 | } |
530 | |
531 | |
532 | /* |
533 | * Async_Notify |
534 | * |
535 | * This is executed by the SQL notify command. |
536 | * |
537 | * Adds the message to the list of pending notifies. |
538 | * Actual notification happens during transaction commit. |
539 | * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
540 | */ |
541 | void |
542 | Async_Notify(const char *channel, const char *payload) |
543 | { |
544 | Notification *n; |
545 | MemoryContext oldcontext; |
546 | |
547 | if (IsParallelWorker()) |
548 | elog(ERROR, "cannot send notifications from a parallel worker" ); |
549 | |
550 | if (Trace_notify) |
551 | elog(DEBUG1, "Async_Notify(%s)" , channel); |
552 | |
553 | /* a channel name must be specified */ |
554 | if (!channel || !strlen(channel)) |
555 | ereport(ERROR, |
556 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
557 | errmsg("channel name cannot be empty" ))); |
558 | |
559 | if (strlen(channel) >= NAMEDATALEN) |
560 | ereport(ERROR, |
561 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
562 | errmsg("channel name too long" ))); |
563 | |
564 | if (payload) |
565 | { |
566 | if (strlen(payload) >= NOTIFY_PAYLOAD_MAX_LENGTH) |
567 | ereport(ERROR, |
568 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
569 | errmsg("payload string too long" ))); |
570 | } |
571 | |
572 | /* no point in making duplicate entries in the list ... */ |
573 | if (AsyncExistsPendingNotify(channel, payload)) |
574 | return; |
575 | |
576 | /* |
577 | * The notification list needs to live until end of transaction, so store |
578 | * it in the transaction context. |
579 | */ |
580 | oldcontext = MemoryContextSwitchTo(CurTransactionContext); |
581 | |
582 | n = (Notification *) palloc(sizeof(Notification)); |
583 | n->channel = pstrdup(channel); |
584 | if (payload) |
585 | n->payload = pstrdup(payload); |
586 | else |
587 | n->payload = "" ; |
588 | |
589 | /* |
590 | * We want to preserve the order so we need to append every notification. |
591 | * See comments at AsyncExistsPendingNotify(). |
592 | */ |
593 | pendingNotifies = lappend(pendingNotifies, n); |
594 | |
595 | MemoryContextSwitchTo(oldcontext); |
596 | } |
597 | |
598 | /* |
599 | * queue_listen |
600 | * Common code for listen, unlisten, unlisten all commands. |
601 | * |
602 | * Adds the request to the list of pending actions. |
603 | * Actual update of the listenChannels list happens during transaction |
604 | * commit. |
605 | */ |
606 | static void |
607 | queue_listen(ListenActionKind action, const char *channel) |
608 | { |
609 | MemoryContext oldcontext; |
610 | ListenAction *actrec; |
611 | |
612 | /* |
613 | * Unlike Async_Notify, we don't try to collapse out duplicates. It would |
614 | * be too complicated to ensure we get the right interactions of |
615 | * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there |
616 | * would be any performance benefit anyway in sane applications. |
617 | */ |
618 | oldcontext = MemoryContextSwitchTo(CurTransactionContext); |
619 | |
620 | /* space for terminating null is included in sizeof(ListenAction) */ |
621 | actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) + |
622 | strlen(channel) + 1); |
623 | actrec->action = action; |
624 | strcpy(actrec->channel, channel); |
625 | |
626 | pendingActions = lappend(pendingActions, actrec); |
627 | |
628 | MemoryContextSwitchTo(oldcontext); |
629 | } |
630 | |
631 | /* |
632 | * Async_Listen |
633 | * |
634 | * This is executed by the SQL listen command. |
635 | */ |
636 | void |
637 | Async_Listen(const char *channel) |
638 | { |
639 | if (Trace_notify) |
640 | elog(DEBUG1, "Async_Listen(%s,%d)" , channel, MyProcPid); |
641 | |
642 | queue_listen(LISTEN_LISTEN, channel); |
643 | } |
644 | |
645 | /* |
646 | * Async_Unlisten |
647 | * |
648 | * This is executed by the SQL unlisten command. |
649 | */ |
650 | void |
651 | Async_Unlisten(const char *channel) |
652 | { |
653 | if (Trace_notify) |
654 | elog(DEBUG1, "Async_Unlisten(%s,%d)" , channel, MyProcPid); |
655 | |
656 | /* If we couldn't possibly be listening, no need to queue anything */ |
657 | if (pendingActions == NIL && !unlistenExitRegistered) |
658 | return; |
659 | |
660 | queue_listen(LISTEN_UNLISTEN, channel); |
661 | } |
662 | |
663 | /* |
664 | * Async_UnlistenAll |
665 | * |
666 | * This is invoked by UNLISTEN * command, and also at backend exit. |
667 | */ |
668 | void |
669 | Async_UnlistenAll(void) |
670 | { |
671 | if (Trace_notify) |
672 | elog(DEBUG1, "Async_UnlistenAll(%d)" , MyProcPid); |
673 | |
674 | /* If we couldn't possibly be listening, no need to queue anything */ |
675 | if (pendingActions == NIL && !unlistenExitRegistered) |
676 | return; |
677 | |
678 | queue_listen(LISTEN_UNLISTEN_ALL, "" ); |
679 | } |
680 | |
681 | /* |
682 | * SQL function: return a set of the channel names this backend is actively |
683 | * listening to. |
684 | * |
685 | * Note: this coding relies on the fact that the listenChannels list cannot |
686 | * change within a transaction. |
687 | */ |
688 | Datum |
689 | pg_listening_channels(PG_FUNCTION_ARGS) |
690 | { |
691 | FuncCallContext *funcctx; |
692 | ListCell **lcp; |
693 | |
694 | /* stuff done only on the first call of the function */ |
695 | if (SRF_IS_FIRSTCALL()) |
696 | { |
697 | MemoryContext oldcontext; |
698 | |
699 | /* create a function context for cross-call persistence */ |
700 | funcctx = SRF_FIRSTCALL_INIT(); |
701 | |
702 | /* switch to memory context appropriate for multiple function calls */ |
703 | oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); |
704 | |
705 | /* allocate memory for user context */ |
706 | lcp = (ListCell **) palloc(sizeof(ListCell *)); |
707 | *lcp = list_head(listenChannels); |
708 | funcctx->user_fctx = (void *) lcp; |
709 | |
710 | MemoryContextSwitchTo(oldcontext); |
711 | } |
712 | |
713 | /* stuff done on every call of the function */ |
714 | funcctx = SRF_PERCALL_SETUP(); |
715 | lcp = (ListCell **) funcctx->user_fctx; |
716 | |
717 | while (*lcp != NULL) |
718 | { |
719 | char *channel = (char *) lfirst(*lcp); |
720 | |
721 | *lcp = lnext(*lcp); |
722 | SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); |
723 | } |
724 | |
725 | SRF_RETURN_DONE(funcctx); |
726 | } |
727 | |
728 | /* |
729 | * Async_UnlistenOnExit |
730 | * |
731 | * This is executed at backend exit if we have done any LISTENs in this |
732 | * backend. It might not be necessary anymore, if the user UNLISTENed |
733 | * everything, but we don't try to detect that case. |
734 | */ |
735 | static void |
736 | Async_UnlistenOnExit(int code, Datum arg) |
737 | { |
738 | Exec_UnlistenAllCommit(); |
739 | asyncQueueUnregister(); |
740 | } |
741 | |
742 | /* |
743 | * AtPrepare_Notify |
744 | * |
745 | * This is called at the prepare phase of a two-phase |
746 | * transaction. Save the state for possible commit later. |
747 | */ |
748 | void |
749 | AtPrepare_Notify(void) |
750 | { |
751 | /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */ |
752 | if (pendingActions || pendingNotifies) |
753 | ereport(ERROR, |
754 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
755 | errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY" ))); |
756 | } |
757 | |
758 | /* |
759 | * PreCommit_Notify |
760 | * |
761 | * This is called at transaction commit, before actually committing to |
762 | * clog. |
763 | * |
764 | * If there are pending LISTEN actions, make sure we are listed in the |
765 | * shared-memory listener array. This must happen before commit to |
766 | * ensure we don't miss any notifies from transactions that commit |
767 | * just after ours. |
768 | * |
769 | * If there are outbound notify requests in the pendingNotifies list, |
770 | * add them to the global queue. We do that before commit so that |
771 | * we can still throw error if we run out of queue space. |
772 | */ |
773 | void |
774 | PreCommit_Notify(void) |
775 | { |
776 | ListCell *p; |
777 | |
778 | if (pendingActions == NIL && pendingNotifies == NIL) |
779 | return; /* no relevant statements in this xact */ |
780 | |
781 | if (Trace_notify) |
782 | elog(DEBUG1, "PreCommit_Notify" ); |
783 | |
784 | /* Preflight for any pending listen/unlisten actions */ |
785 | foreach(p, pendingActions) |
786 | { |
787 | ListenAction *actrec = (ListenAction *) lfirst(p); |
788 | |
789 | switch (actrec->action) |
790 | { |
791 | case LISTEN_LISTEN: |
792 | Exec_ListenPreCommit(); |
793 | break; |
794 | case LISTEN_UNLISTEN: |
795 | /* there is no Exec_UnlistenPreCommit() */ |
796 | break; |
797 | case LISTEN_UNLISTEN_ALL: |
798 | /* there is no Exec_UnlistenAllPreCommit() */ |
799 | break; |
800 | } |
801 | } |
802 | |
803 | /* Queue any pending notifies (must happen after the above) */ |
804 | if (pendingNotifies) |
805 | { |
806 | ListCell *nextNotify; |
807 | |
808 | /* |
809 | * Make sure that we have an XID assigned to the current transaction. |
810 | * GetCurrentTransactionId is cheap if we already have an XID, but not |
811 | * so cheap if we don't, and we'd prefer not to do that work while |
812 | * holding AsyncQueueLock. |
813 | */ |
814 | (void) GetCurrentTransactionId(); |
815 | |
816 | /* |
817 | * Serialize writers by acquiring a special lock that we hold till |
818 | * after commit. This ensures that queue entries appear in commit |
819 | * order, and in particular that there are never uncommitted queue |
820 | * entries ahead of committed ones, so an uncommitted transaction |
821 | * can't block delivery of deliverable notifications. |
822 | * |
823 | * We use a heavyweight lock so that it'll automatically be released |
824 | * after either commit or abort. This also allows deadlocks to be |
825 | * detected, though really a deadlock shouldn't be possible here. |
826 | * |
827 | * The lock is on "database 0", which is pretty ugly but it doesn't |
828 | * seem worth inventing a special locktag category just for this. |
829 | * (Historical note: before PG 9.0, a similar lock on "database 0" was |
830 | * used by the flatfiles mechanism.) |
831 | */ |
832 | LockSharedObject(DatabaseRelationId, InvalidOid, 0, |
833 | AccessExclusiveLock); |
834 | |
835 | /* Now push the notifications into the queue */ |
836 | backendHasSentNotifications = true; |
837 | |
838 | nextNotify = list_head(pendingNotifies); |
839 | while (nextNotify != NULL) |
840 | { |
841 | /* |
842 | * Add the pending notifications to the queue. We acquire and |
843 | * release AsyncQueueLock once per page, which might be overkill |
844 | * but it does allow readers to get in while we're doing this. |
845 | * |
846 | * A full queue is very uncommon and should really not happen, |
847 | * given that we have so much space available in the SLRU pages. |
848 | * Nevertheless we need to deal with this possibility. Note that |
849 | * when we get here we are in the process of committing our |
850 | * transaction, but we have not yet committed to clog, so at this |
851 | * point in time we can still roll the transaction back. |
852 | */ |
853 | LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); |
854 | asyncQueueFillWarning(); |
855 | if (asyncQueueIsFull()) |
856 | ereport(ERROR, |
857 | (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
858 | errmsg("too many notifications in the NOTIFY queue" ))); |
859 | nextNotify = asyncQueueAddEntries(nextNotify); |
860 | LWLockRelease(AsyncQueueLock); |
861 | } |
862 | } |
863 | } |
864 | |
865 | /* |
866 | * AtCommit_Notify |
867 | * |
868 | * This is called at transaction commit, after committing to clog. |
869 | * |
870 | * Update listenChannels and clear transaction-local state. |
871 | */ |
872 | void |
873 | AtCommit_Notify(void) |
874 | { |
875 | ListCell *p; |
876 | |
877 | /* |
878 | * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to |
879 | * return as soon as possible |
880 | */ |
881 | if (!pendingActions && !pendingNotifies) |
882 | return; |
883 | |
884 | if (Trace_notify) |
885 | elog(DEBUG1, "AtCommit_Notify" ); |
886 | |
887 | /* Perform any pending listen/unlisten actions */ |
888 | foreach(p, pendingActions) |
889 | { |
890 | ListenAction *actrec = (ListenAction *) lfirst(p); |
891 | |
892 | switch (actrec->action) |
893 | { |
894 | case LISTEN_LISTEN: |
895 | Exec_ListenCommit(actrec->channel); |
896 | break; |
897 | case LISTEN_UNLISTEN: |
898 | Exec_UnlistenCommit(actrec->channel); |
899 | break; |
900 | case LISTEN_UNLISTEN_ALL: |
901 | Exec_UnlistenAllCommit(); |
902 | break; |
903 | } |
904 | } |
905 | |
906 | /* If no longer listening to anything, get out of listener array */ |
907 | if (amRegisteredListener && listenChannels == NIL) |
908 | asyncQueueUnregister(); |
909 | |
910 | /* And clean up */ |
911 | ClearPendingActionsAndNotifies(); |
912 | } |
913 | |
914 | /* |
915 | * Exec_ListenPreCommit --- subroutine for PreCommit_Notify |
916 | * |
917 | * This function must make sure we are ready to catch any incoming messages. |
918 | */ |
919 | static void |
920 | Exec_ListenPreCommit(void) |
921 | { |
922 | QueuePosition head; |
923 | QueuePosition max; |
924 | int i; |
925 | |
926 | /* |
927 | * Nothing to do if we are already listening to something, nor if we |
928 | * already ran this routine in this transaction. |
929 | */ |
930 | if (amRegisteredListener) |
931 | return; |
932 | |
933 | if (Trace_notify) |
934 | elog(DEBUG1, "Exec_ListenPreCommit(%d)" , MyProcPid); |
935 | |
936 | /* |
937 | * Before registering, make sure we will unlisten before dying. (Note: |
938 | * this action does not get undone if we abort later.) |
939 | */ |
940 | if (!unlistenExitRegistered) |
941 | { |
942 | before_shmem_exit(Async_UnlistenOnExit, 0); |
943 | unlistenExitRegistered = true; |
944 | } |
945 | |
946 | /* |
947 | * This is our first LISTEN, so establish our pointer. |
948 | * |
949 | * We set our pointer to the global tail pointer and then move it forward |
950 | * over already-committed notifications. This ensures we cannot miss any |
951 | * not-yet-committed notifications. We might get a few more but that |
952 | * doesn't hurt. |
953 | * |
954 | * In some scenarios there might be a lot of committed notifications that |
955 | * have not yet been pruned away (because some backend is being lazy about |
956 | * reading them). To reduce our startup time, we can look at other |
957 | * backends and adopt the maximum "pos" pointer of any backend that's in |
958 | * our database; any notifications it's already advanced over are surely |
959 | * committed and need not be re-examined by us. (We must consider only |
960 | * backends connected to our DB, because others will not have bothered to |
961 | * check committed-ness of notifications in our DB.) But we only bother |
962 | * with that if there's more than a page worth of notifications |
963 | * outstanding, otherwise scanning all the other backends isn't worth it. |
964 | * |
965 | * We need exclusive lock here so we can look at other backends' entries. |
966 | */ |
967 | LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); |
968 | head = QUEUE_HEAD; |
969 | max = QUEUE_TAIL; |
970 | if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head)) |
971 | { |
972 | for (i = 1; i <= MaxBackends; i++) |
973 | { |
974 | if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) |
975 | max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); |
976 | } |
977 | } |
978 | QUEUE_BACKEND_POS(MyBackendId) = max; |
979 | QUEUE_BACKEND_PID(MyBackendId) = MyProcPid; |
980 | QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId; |
981 | LWLockRelease(AsyncQueueLock); |
982 | |
983 | /* Now we are listed in the global array, so remember we're listening */ |
984 | amRegisteredListener = true; |
985 | |
986 | /* |
987 | * Try to move our pointer forward as far as possible. This will skip over |
988 | * already-committed notifications. Still, we could get notifications that |
989 | * have already committed before we started to LISTEN. |
990 | * |
991 | * Note that we are not yet listening on anything, so we won't deliver any |
992 | * notification to the frontend. Also, although our transaction might |
993 | * have executed NOTIFY, those message(s) aren't queued yet so we can't |
994 | * see them in the queue. |
995 | * |
996 | * This will also advance the global tail pointer if possible. |
997 | */ |
998 | if (!QUEUE_POS_EQUAL(max, head)) |
999 | asyncQueueReadAllNotifications(); |
1000 | } |
1001 | |
1002 | /* |
1003 | * Exec_ListenCommit --- subroutine for AtCommit_Notify |
1004 | * |
1005 | * Add the channel to the list of channels we are listening on. |
1006 | */ |
1007 | static void |
1008 | Exec_ListenCommit(const char *channel) |
1009 | { |
1010 | MemoryContext oldcontext; |
1011 | |
1012 | /* Do nothing if we are already listening on this channel */ |
1013 | if (IsListeningOn(channel)) |
1014 | return; |
1015 | |
1016 | /* |
1017 | * Add the new channel name to listenChannels. |
1018 | * |
1019 | * XXX It is theoretically possible to get an out-of-memory failure here, |
1020 | * which would be bad because we already committed. For the moment it |
1021 | * doesn't seem worth trying to guard against that, but maybe improve this |
1022 | * later. |
1023 | */ |
1024 | oldcontext = MemoryContextSwitchTo(TopMemoryContext); |
1025 | listenChannels = lappend(listenChannels, pstrdup(channel)); |
1026 | MemoryContextSwitchTo(oldcontext); |
1027 | } |
1028 | |
1029 | /* |
1030 | * Exec_UnlistenCommit --- subroutine for AtCommit_Notify |
1031 | * |
1032 | * Remove the specified channel name from listenChannels. |
1033 | */ |
1034 | static void |
1035 | Exec_UnlistenCommit(const char *channel) |
1036 | { |
1037 | ListCell *q; |
1038 | ListCell *prev; |
1039 | |
1040 | if (Trace_notify) |
1041 | elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)" , channel, MyProcPid); |
1042 | |
1043 | prev = NULL; |
1044 | foreach(q, listenChannels) |
1045 | { |
1046 | char *lchan = (char *) lfirst(q); |
1047 | |
1048 | if (strcmp(lchan, channel) == 0) |
1049 | { |
1050 | listenChannels = list_delete_cell(listenChannels, q, prev); |
1051 | pfree(lchan); |
1052 | break; |
1053 | } |
1054 | prev = q; |
1055 | } |
1056 | |
1057 | /* |
1058 | * We do not complain about unlistening something not being listened; |
1059 | * should we? |
1060 | */ |
1061 | } |
1062 | |
1063 | /* |
1064 | * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify |
1065 | * |
1066 | * Unlisten on all channels for this backend. |
1067 | */ |
1068 | static void |
1069 | Exec_UnlistenAllCommit(void) |
1070 | { |
1071 | if (Trace_notify) |
1072 | elog(DEBUG1, "Exec_UnlistenAllCommit(%d)" , MyProcPid); |
1073 | |
1074 | list_free_deep(listenChannels); |
1075 | listenChannels = NIL; |
1076 | } |
1077 | |
1078 | /* |
1079 | * ProcessCompletedNotifies --- send out signals and self-notifies |
1080 | * |
1081 | * This is called from postgres.c just before going idle at the completion |
1082 | * of a transaction. If we issued any notifications in the just-completed |
1083 | * transaction, send signals to other backends to process them, and also |
1084 | * process the queue ourselves to send messages to our own frontend. |
1085 | * |
1086 | * The reason that this is not done in AtCommit_Notify is that there is |
1087 | * a nonzero chance of errors here (for example, encoding conversion errors |
1088 | * while trying to format messages to our frontend). An error during |
1089 | * AtCommit_Notify would be a PANIC condition. The timing is also arranged |
1090 | * to ensure that a transaction's self-notifies are delivered to the frontend |
1091 | * before it gets the terminating ReadyForQuery message. |
1092 | * |
1093 | * Note that we send signals and process the queue even if the transaction |
1094 | * eventually aborted. This is because we need to clean out whatever got |
1095 | * added to the queue. |
1096 | * |
1097 | * NOTE: we are outside of any transaction here. |
1098 | */ |
1099 | void |
1100 | ProcessCompletedNotifies(void) |
1101 | { |
1102 | MemoryContext caller_context; |
1103 | bool signalled; |
1104 | |
1105 | /* Nothing to do if we didn't send any notifications */ |
1106 | if (!backendHasSentNotifications) |
1107 | return; |
1108 | |
1109 | /* |
1110 | * We reset the flag immediately; otherwise, if any sort of error occurs |
1111 | * below, we'd be locked up in an infinite loop, because control will come |
1112 | * right back here after error cleanup. |
1113 | */ |
1114 | backendHasSentNotifications = false; |
1115 | |
1116 | /* |
1117 | * We must preserve the caller's memory context (probably MessageContext) |
1118 | * across the transaction we do here. |
1119 | */ |
1120 | caller_context = CurrentMemoryContext; |
1121 | |
1122 | if (Trace_notify) |
1123 | elog(DEBUG1, "ProcessCompletedNotifies" ); |
1124 | |
1125 | /* |
1126 | * We must run asyncQueueReadAllNotifications inside a transaction, else |
1127 | * bad things happen if it gets an error. |
1128 | */ |
1129 | StartTransactionCommand(); |
1130 | |
1131 | /* Send signals to other backends */ |
1132 | signalled = SignalBackends(); |
1133 | |
1134 | if (listenChannels != NIL) |
1135 | { |
1136 | /* Read the queue ourselves, and send relevant stuff to the frontend */ |
1137 | asyncQueueReadAllNotifications(); |
1138 | } |
1139 | else if (!signalled) |
1140 | { |
1141 | /* |
1142 | * If we found no other listening backends, and we aren't listening |
1143 | * ourselves, then we must execute asyncQueueAdvanceTail to flush the |
1144 | * queue, because ain't nobody else gonna do it. This prevents queue |
1145 | * overflow when we're sending useless notifies to nobody. (A new |
1146 | * listener could have joined since we looked, but if so this is |
1147 | * harmless.) |
1148 | */ |
1149 | asyncQueueAdvanceTail(); |
1150 | } |
1151 | |
1152 | CommitTransactionCommand(); |
1153 | |
1154 | MemoryContextSwitchTo(caller_context); |
1155 | |
1156 | /* We don't need pq_flush() here since postgres.c will do one shortly */ |
1157 | } |
1158 | |
1159 | /* |
1160 | * Test whether we are actively listening on the given channel name. |
1161 | * |
1162 | * Note: this function is executed for every notification found in the queue. |
1163 | * Perhaps it is worth further optimization, eg convert the list to a sorted |
1164 | * array so we can binary-search it. In practice the list is likely to be |
1165 | * fairly short, though. |
1166 | */ |
1167 | static bool |
1168 | IsListeningOn(const char *channel) |
1169 | { |
1170 | ListCell *p; |
1171 | |
1172 | foreach(p, listenChannels) |
1173 | { |
1174 | char *lchan = (char *) lfirst(p); |
1175 | |
1176 | if (strcmp(lchan, channel) == 0) |
1177 | return true; |
1178 | } |
1179 | return false; |
1180 | } |
1181 | |
1182 | /* |
1183 | * Remove our entry from the listeners array when we are no longer listening |
1184 | * on any channel. NB: must not fail if we're already not listening. |
1185 | */ |
1186 | static void |
1187 | asyncQueueUnregister(void) |
1188 | { |
1189 | bool advanceTail; |
1190 | |
1191 | Assert(listenChannels == NIL); /* else caller error */ |
1192 | |
1193 | if (!amRegisteredListener) /* nothing to do */ |
1194 | return; |
1195 | |
1196 | LWLockAcquire(AsyncQueueLock, LW_SHARED); |
1197 | /* check if entry is valid and oldest ... */ |
1198 | advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) && |
1199 | QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL); |
1200 | /* ... then mark it invalid */ |
1201 | QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; |
1202 | QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; |
1203 | LWLockRelease(AsyncQueueLock); |
1204 | |
1205 | /* mark ourselves as no longer listed in the global array */ |
1206 | amRegisteredListener = false; |
1207 | |
1208 | /* If we were the laziest backend, try to advance the tail pointer */ |
1209 | if (advanceTail) |
1210 | asyncQueueAdvanceTail(); |
1211 | } |
1212 | |
1213 | /* |
1214 | * Test whether there is room to insert more notification messages. |
1215 | * |
1216 | * Caller must hold at least shared AsyncQueueLock. |
1217 | */ |
1218 | static bool |
1219 | asyncQueueIsFull(void) |
1220 | { |
1221 | int nexthead; |
1222 | int boundary; |
1223 | |
1224 | /* |
1225 | * The queue is full if creating a new head page would create a page that |
1226 | * logically precedes the current global tail pointer, ie, the head |
1227 | * pointer would wrap around compared to the tail. We cannot create such |
1228 | * a head page for fear of confusing slru.c. For safety we round the tail |
1229 | * pointer back to a segment boundary (compare the truncation logic in |
1230 | * asyncQueueAdvanceTail). |
1231 | * |
1232 | * Note that this test is *not* dependent on how much space there is on |
1233 | * the current head page. This is necessary because asyncQueueAddEntries |
1234 | * might try to create the next head page in any case. |
1235 | */ |
1236 | nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1; |
1237 | if (nexthead > QUEUE_MAX_PAGE) |
1238 | nexthead = 0; /* wrap around */ |
1239 | boundary = QUEUE_POS_PAGE(QUEUE_TAIL); |
1240 | boundary -= boundary % SLRU_PAGES_PER_SEGMENT; |
1241 | return asyncQueuePagePrecedes(nexthead, boundary); |
1242 | } |
1243 | |
1244 | /* |
1245 | * Advance the QueuePosition to the next entry, assuming that the current |
1246 | * entry is of length entryLength. If we jump to a new page the function |
1247 | * returns true, else false. |
1248 | */ |
1249 | static bool |
1250 | asyncQueueAdvance(volatile QueuePosition *position, int entryLength) |
1251 | { |
1252 | int pageno = QUEUE_POS_PAGE(*position); |
1253 | int offset = QUEUE_POS_OFFSET(*position); |
1254 | bool pageJump = false; |
1255 | |
1256 | /* |
1257 | * Move to the next writing position: First jump over what we have just |
1258 | * written or read. |
1259 | */ |
1260 | offset += entryLength; |
1261 | Assert(offset <= QUEUE_PAGESIZE); |
1262 | |
1263 | /* |
1264 | * In a second step check if another entry can possibly be written to the |
1265 | * page. If so, stay here, we have reached the next position. If not, then |
1266 | * we need to move on to the next page. |
1267 | */ |
1268 | if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE) |
1269 | { |
1270 | pageno++; |
1271 | if (pageno > QUEUE_MAX_PAGE) |
1272 | pageno = 0; /* wrap around */ |
1273 | offset = 0; |
1274 | pageJump = true; |
1275 | } |
1276 | |
1277 | SET_QUEUE_POS(*position, pageno, offset); |
1278 | return pageJump; |
1279 | } |
1280 | |
1281 | /* |
1282 | * Fill the AsyncQueueEntry at *qe with an outbound notification message. |
1283 | */ |
1284 | static void |
1285 | asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) |
1286 | { |
1287 | size_t channellen = strlen(n->channel); |
1288 | size_t payloadlen = strlen(n->payload); |
1289 | int entryLength; |
1290 | |
1291 | Assert(channellen < NAMEDATALEN); |
1292 | Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); |
1293 | |
1294 | /* The terminators are already included in AsyncQueueEntryEmptySize */ |
1295 | entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen; |
1296 | entryLength = QUEUEALIGN(entryLength); |
1297 | qe->length = entryLength; |
1298 | qe->dboid = MyDatabaseId; |
1299 | qe->xid = GetCurrentTransactionId(); |
1300 | qe->srcPid = MyProcPid; |
1301 | memcpy(qe->data, n->channel, channellen + 1); |
1302 | memcpy(qe->data + channellen + 1, n->payload, payloadlen + 1); |
1303 | } |
1304 | |
1305 | /* |
1306 | * Add pending notifications to the queue. |
1307 | * |
1308 | * We go page by page here, i.e. we stop once we have to go to a new page but |
1309 | * we will be called again and then fill that next page. If an entry does not |
1310 | * fit into the current page, we write a dummy entry with an InvalidOid as the |
1311 | * database OID in order to fill the page. So every page is always used up to |
1312 | * the last byte which simplifies reading the page later. |
1313 | * |
1314 | * We are passed the list cell containing the next notification to write |
1315 | * and return the first still-unwritten cell back. Eventually we will return |
1316 | * NULL indicating all is done. |
1317 | * |
1318 | * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock |
1319 | * locally in this function. |
1320 | */ |
1321 | static ListCell * |
1322 | asyncQueueAddEntries(ListCell *nextNotify) |
1323 | { |
1324 | AsyncQueueEntry qe; |
1325 | QueuePosition queue_head; |
1326 | int pageno; |
1327 | int offset; |
1328 | int slotno; |
1329 | |
1330 | /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */ |
1331 | LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); |
1332 | |
1333 | /* |
1334 | * We work with a local copy of QUEUE_HEAD, which we write back to shared |
1335 | * memory upon exiting. The reason for this is that if we have to advance |
1336 | * to a new page, SimpleLruZeroPage might fail (out of disk space, for |
1337 | * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise, |
1338 | * subsequent insertions would try to put entries into a page that slru.c |
1339 | * thinks doesn't exist yet.) So, use a local position variable. Note |
1340 | * that if we do fail, any already-inserted queue entries are forgotten; |
1341 | * this is okay, since they'd be useless anyway after our transaction |
1342 | * rolls back. |
1343 | */ |
1344 | queue_head = QUEUE_HEAD; |
1345 | |
1346 | /* Fetch the current page */ |
1347 | pageno = QUEUE_POS_PAGE(queue_head); |
1348 | slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId); |
1349 | /* Note we mark the page dirty before writing in it */ |
1350 | AsyncCtl->shared->page_dirty[slotno] = true; |
1351 | |
1352 | while (nextNotify != NULL) |
1353 | { |
1354 | Notification *n = (Notification *) lfirst(nextNotify); |
1355 | |
1356 | /* Construct a valid queue entry in local variable qe */ |
1357 | asyncQueueNotificationToEntry(n, &qe); |
1358 | |
1359 | offset = QUEUE_POS_OFFSET(queue_head); |
1360 | |
1361 | /* Check whether the entry really fits on the current page */ |
1362 | if (offset + qe.length <= QUEUE_PAGESIZE) |
1363 | { |
1364 | /* OK, so advance nextNotify past this item */ |
1365 | nextNotify = lnext(nextNotify); |
1366 | } |
1367 | else |
1368 | { |
1369 | /* |
1370 | * Write a dummy entry to fill up the page. Actually readers will |
1371 | * only check dboid and since it won't match any reader's database |
1372 | * OID, they will ignore this entry and move on. |
1373 | */ |
1374 | qe.length = QUEUE_PAGESIZE - offset; |
1375 | qe.dboid = InvalidOid; |
1376 | qe.data[0] = '\0'; /* empty channel */ |
1377 | qe.data[1] = '\0'; /* empty payload */ |
1378 | } |
1379 | |
1380 | /* Now copy qe into the shared buffer page */ |
1381 | memcpy(AsyncCtl->shared->page_buffer[slotno] + offset, |
1382 | &qe, |
1383 | qe.length); |
1384 | |
1385 | /* Advance queue_head appropriately, and detect if page is full */ |
1386 | if (asyncQueueAdvance(&(queue_head), qe.length)) |
1387 | { |
1388 | /* |
1389 | * Page is full, so we're done here, but first fill the next page |
1390 | * with zeroes. The reason to do this is to ensure that slru.c's |
1391 | * idea of the head page is always the same as ours, which avoids |
1392 | * boundary problems in SimpleLruTruncate. The test in |
1393 | * asyncQueueIsFull() ensured that there is room to create this |
1394 | * page without overrunning the queue. |
1395 | */ |
1396 | slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head)); |
1397 | /* And exit the loop */ |
1398 | break; |
1399 | } |
1400 | } |
1401 | |
1402 | /* Success, so update the global QUEUE_HEAD */ |
1403 | QUEUE_HEAD = queue_head; |
1404 | |
1405 | LWLockRelease(AsyncCtlLock); |
1406 | |
1407 | return nextNotify; |
1408 | } |
1409 | |
1410 | /* |
1411 | * SQL function to return the fraction of the notification queue currently |
1412 | * occupied. |
1413 | */ |
1414 | Datum |
1415 | pg_notification_queue_usage(PG_FUNCTION_ARGS) |
1416 | { |
1417 | double usage; |
1418 | |
1419 | LWLockAcquire(AsyncQueueLock, LW_SHARED); |
1420 | usage = asyncQueueUsage(); |
1421 | LWLockRelease(AsyncQueueLock); |
1422 | |
1423 | PG_RETURN_FLOAT8(usage); |
1424 | } |
1425 | |
1426 | /* |
1427 | * Return the fraction of the queue that is currently occupied. |
1428 | * |
1429 | * The caller must hold AsyncQueueLock in (at least) shared mode. |
1430 | */ |
1431 | static double |
1432 | asyncQueueUsage(void) |
1433 | { |
1434 | int headPage = QUEUE_POS_PAGE(QUEUE_HEAD); |
1435 | int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); |
1436 | int occupied; |
1437 | |
1438 | occupied = headPage - tailPage; |
1439 | |
1440 | if (occupied == 0) |
1441 | return (double) 0; /* fast exit for common case */ |
1442 | |
1443 | if (occupied < 0) |
1444 | { |
1445 | /* head has wrapped around, tail not yet */ |
1446 | occupied += QUEUE_MAX_PAGE + 1; |
1447 | } |
1448 | |
1449 | return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2); |
1450 | } |
1451 | |
1452 | /* |
1453 | * Check whether the queue is at least half full, and emit a warning if so. |
1454 | * |
1455 | * This is unlikely given the size of the queue, but possible. |
1456 | * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. |
1457 | * |
1458 | * Caller must hold exclusive AsyncQueueLock. |
1459 | */ |
1460 | static void |
1461 | asyncQueueFillWarning(void) |
1462 | { |
1463 | double fillDegree; |
1464 | TimestampTz t; |
1465 | |
1466 | fillDegree = asyncQueueUsage(); |
1467 | if (fillDegree < 0.5) |
1468 | return; |
1469 | |
1470 | t = GetCurrentTimestamp(); |
1471 | |
1472 | if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn, |
1473 | t, QUEUE_FULL_WARN_INTERVAL)) |
1474 | { |
1475 | QueuePosition min = QUEUE_HEAD; |
1476 | int32 minPid = InvalidPid; |
1477 | int i; |
1478 | |
1479 | for (i = 1; i <= MaxBackends; i++) |
1480 | { |
1481 | if (QUEUE_BACKEND_PID(i) != InvalidPid) |
1482 | { |
1483 | min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); |
1484 | if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) |
1485 | minPid = QUEUE_BACKEND_PID(i); |
1486 | } |
1487 | } |
1488 | |
1489 | ereport(WARNING, |
1490 | (errmsg("NOTIFY queue is %.0f%% full" , fillDegree * 100), |
1491 | (minPid != InvalidPid ? |
1492 | errdetail("The server process with PID %d is among those with the oldest transactions." , minPid) |
1493 | : 0), |
1494 | (minPid != InvalidPid ? |
1495 | errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction." ) |
1496 | : 0))); |
1497 | |
1498 | asyncQueueControl->lastQueueFillWarn = t; |
1499 | } |
1500 | } |
1501 | |
1502 | /* |
1503 | * Send signals to all listening backends (except our own). |
1504 | * |
1505 | * Returns true if we sent at least one signal. |
1506 | * |
1507 | * Since we need EXCLUSIVE lock anyway we also check the position of the other |
1508 | * backends and in case one is already up-to-date we don't signal it. |
1509 | * This can happen if concurrent notifying transactions have sent a signal and |
1510 | * the signaled backend has read the other notifications and ours in the same |
1511 | * step. |
1512 | * |
1513 | * Since we know the BackendId and the Pid the signalling is quite cheap. |
1514 | */ |
1515 | static bool |
1516 | SignalBackends(void) |
1517 | { |
1518 | bool signalled = false; |
1519 | int32 *pids; |
1520 | BackendId *ids; |
1521 | int count; |
1522 | int i; |
1523 | int32 pid; |
1524 | |
1525 | /* |
1526 | * Identify all backends that are listening and not already up-to-date. We |
1527 | * don't want to send signals while holding the AsyncQueueLock, so we just |
1528 | * build a list of target PIDs. |
1529 | * |
1530 | * XXX in principle these pallocs could fail, which would be bad. Maybe |
1531 | * preallocate the arrays? But in practice this is only run in trivial |
1532 | * transactions, so there should surely be space available. |
1533 | */ |
1534 | pids = (int32 *) palloc(MaxBackends * sizeof(int32)); |
1535 | ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId)); |
1536 | count = 0; |
1537 | |
1538 | LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); |
1539 | for (i = 1; i <= MaxBackends; i++) |
1540 | { |
1541 | pid = QUEUE_BACKEND_PID(i); |
1542 | if (pid != InvalidPid && pid != MyProcPid) |
1543 | { |
1544 | QueuePosition pos = QUEUE_BACKEND_POS(i); |
1545 | |
1546 | if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) |
1547 | { |
1548 | pids[count] = pid; |
1549 | ids[count] = i; |
1550 | count++; |
1551 | } |
1552 | } |
1553 | } |
1554 | LWLockRelease(AsyncQueueLock); |
1555 | |
1556 | /* Now send signals */ |
1557 | for (i = 0; i < count; i++) |
1558 | { |
1559 | pid = pids[i]; |
1560 | |
1561 | /* |
1562 | * Note: assuming things aren't broken, a signal failure here could |
1563 | * only occur if the target backend exited since we released |
1564 | * AsyncQueueLock; which is unlikely but certainly possible. So we |
1565 | * just log a low-level debug message if it happens. |
1566 | */ |
1567 | if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0) |
1568 | elog(DEBUG3, "could not signal backend with PID %d: %m" , pid); |
1569 | else |
1570 | signalled = true; |
1571 | } |
1572 | |
1573 | pfree(pids); |
1574 | pfree(ids); |
1575 | |
1576 | return signalled; |
1577 | } |
1578 | |
1579 | /* |
1580 | * AtAbort_Notify |
1581 | * |
1582 | * This is called at transaction abort. |
1583 | * |
1584 | * Gets rid of pending actions and outbound notifies that we would have |
1585 | * executed if the transaction got committed. |
1586 | */ |
1587 | void |
1588 | AtAbort_Notify(void) |
1589 | { |
1590 | /* |
1591 | * If we LISTEN but then roll back the transaction after PreCommit_Notify, |
1592 | * we have registered as a listener but have not made any entry in |
1593 | * listenChannels. In that case, deregister again. |
1594 | */ |
1595 | if (amRegisteredListener && listenChannels == NIL) |
1596 | asyncQueueUnregister(); |
1597 | |
1598 | /* And clean up */ |
1599 | ClearPendingActionsAndNotifies(); |
1600 | } |
1601 | |
1602 | /* |
1603 | * AtSubStart_Notify() --- Take care of subtransaction start. |
1604 | * |
1605 | * Push empty state for the new subtransaction. |
1606 | */ |
1607 | void |
1608 | AtSubStart_Notify(void) |
1609 | { |
1610 | MemoryContext old_cxt; |
1611 | |
1612 | /* Keep the list-of-lists in TopTransactionContext for simplicity */ |
1613 | old_cxt = MemoryContextSwitchTo(TopTransactionContext); |
1614 | |
1615 | upperPendingActions = lcons(pendingActions, upperPendingActions); |
1616 | |
1617 | Assert(list_length(upperPendingActions) == |
1618 | GetCurrentTransactionNestLevel() - 1); |
1619 | |
1620 | pendingActions = NIL; |
1621 | |
1622 | upperPendingNotifies = lcons(pendingNotifies, upperPendingNotifies); |
1623 | |
1624 | Assert(list_length(upperPendingNotifies) == |
1625 | GetCurrentTransactionNestLevel() - 1); |
1626 | |
1627 | pendingNotifies = NIL; |
1628 | |
1629 | MemoryContextSwitchTo(old_cxt); |
1630 | } |
1631 | |
1632 | /* |
1633 | * AtSubCommit_Notify() --- Take care of subtransaction commit. |
1634 | * |
1635 | * Reassign all items in the pending lists to the parent transaction. |
1636 | */ |
1637 | void |
1638 | AtSubCommit_Notify(void) |
1639 | { |
1640 | List *parentPendingActions; |
1641 | List *parentPendingNotifies; |
1642 | |
1643 | parentPendingActions = linitial_node(List, upperPendingActions); |
1644 | upperPendingActions = list_delete_first(upperPendingActions); |
1645 | |
1646 | Assert(list_length(upperPendingActions) == |
1647 | GetCurrentTransactionNestLevel() - 2); |
1648 | |
1649 | /* |
1650 | * Mustn't try to eliminate duplicates here --- see queue_listen() |
1651 | */ |
1652 | pendingActions = list_concat(parentPendingActions, pendingActions); |
1653 | |
1654 | parentPendingNotifies = linitial_node(List, upperPendingNotifies); |
1655 | upperPendingNotifies = list_delete_first(upperPendingNotifies); |
1656 | |
1657 | Assert(list_length(upperPendingNotifies) == |
1658 | GetCurrentTransactionNestLevel() - 2); |
1659 | |
1660 | /* |
1661 | * We could try to eliminate duplicates here, but it seems not worthwhile. |
1662 | */ |
1663 | pendingNotifies = list_concat(parentPendingNotifies, pendingNotifies); |
1664 | } |
1665 | |
1666 | /* |
1667 | * AtSubAbort_Notify() --- Take care of subtransaction abort. |
1668 | */ |
1669 | void |
1670 | AtSubAbort_Notify(void) |
1671 | { |
1672 | int my_level = GetCurrentTransactionNestLevel(); |
1673 | |
1674 | /* |
1675 | * All we have to do is pop the stack --- the actions/notifies made in |
1676 | * this subxact are no longer interesting, and the space will be freed |
1677 | * when CurTransactionContext is recycled. |
1678 | * |
1679 | * This routine could be called more than once at a given nesting level if |
1680 | * there is trouble during subxact abort. Avoid dumping core by using |
1681 | * GetCurrentTransactionNestLevel as the indicator of how far we need to |
1682 | * prune the list. |
1683 | */ |
1684 | while (list_length(upperPendingActions) > my_level - 2) |
1685 | { |
1686 | pendingActions = linitial_node(List, upperPendingActions); |
1687 | upperPendingActions = list_delete_first(upperPendingActions); |
1688 | } |
1689 | |
1690 | while (list_length(upperPendingNotifies) > my_level - 2) |
1691 | { |
1692 | pendingNotifies = linitial_node(List, upperPendingNotifies); |
1693 | upperPendingNotifies = list_delete_first(upperPendingNotifies); |
1694 | } |
1695 | } |
1696 | |
1697 | /* |
1698 | * HandleNotifyInterrupt |
1699 | * |
1700 | * Signal handler portion of interrupt handling. Let the backend know |
1701 | * that there's a pending notify interrupt. If we're currently reading |
1702 | * from the client, this will interrupt the read and |
1703 | * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt(). |
1704 | */ |
1705 | void |
1706 | HandleNotifyInterrupt(void) |
1707 | { |
1708 | /* |
1709 | * Note: this is called by a SIGNAL HANDLER. You must be very wary what |
1710 | * you do here. |
1711 | */ |
1712 | |
1713 | /* signal that work needs to be done */ |
1714 | notifyInterruptPending = true; |
1715 | |
1716 | /* make sure the event is processed in due course */ |
1717 | SetLatch(MyLatch); |
1718 | } |
1719 | |
1720 | /* |
1721 | * ProcessNotifyInterrupt |
1722 | * |
1723 | * This is called just after waiting for a frontend command. If a |
1724 | * interrupt arrives (via HandleNotifyInterrupt()) while reading, the |
1725 | * read will be interrupted via the process's latch, and this routine |
1726 | * will get called. If we are truly idle (ie, *not* inside a transaction |
1727 | * block), process the incoming notifies. |
1728 | */ |
1729 | void |
1730 | ProcessNotifyInterrupt(void) |
1731 | { |
1732 | if (IsTransactionOrTransactionBlock()) |
1733 | return; /* not really idle */ |
1734 | |
1735 | while (notifyInterruptPending) |
1736 | ProcessIncomingNotify(); |
1737 | } |
1738 | |
1739 | |
1740 | /* |
1741 | * Read all pending notifications from the queue, and deliver appropriate |
1742 | * ones to my frontend. Stop when we reach queue head or an uncommitted |
1743 | * notification. |
1744 | */ |
1745 | static void |
1746 | asyncQueueReadAllNotifications(void) |
1747 | { |
1748 | volatile QueuePosition pos; |
1749 | QueuePosition oldpos; |
1750 | QueuePosition head; |
1751 | Snapshot snapshot; |
1752 | bool advanceTail; |
1753 | |
1754 | /* page_buffer must be adequately aligned, so use a union */ |
1755 | union |
1756 | { |
1757 | char buf[QUEUE_PAGESIZE]; |
1758 | AsyncQueueEntry align; |
1759 | } page_buffer; |
1760 | |
1761 | /* Fetch current state */ |
1762 | LWLockAcquire(AsyncQueueLock, LW_SHARED); |
1763 | /* Assert checks that we have a valid state entry */ |
1764 | Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId)); |
1765 | pos = oldpos = QUEUE_BACKEND_POS(MyBackendId); |
1766 | head = QUEUE_HEAD; |
1767 | LWLockRelease(AsyncQueueLock); |
1768 | |
1769 | if (QUEUE_POS_EQUAL(pos, head)) |
1770 | { |
1771 | /* Nothing to do, we have read all notifications already. */ |
1772 | return; |
1773 | } |
1774 | |
1775 | /* Get snapshot we'll use to decide which xacts are still in progress */ |
1776 | snapshot = RegisterSnapshot(GetLatestSnapshot()); |
1777 | |
1778 | /*---------- |
1779 | * Note that we deliver everything that we see in the queue and that |
1780 | * matches our _current_ listening state. |
1781 | * Especially we do not take into account different commit times. |
1782 | * Consider the following example: |
1783 | * |
1784 | * Backend 1: Backend 2: |
1785 | * |
1786 | * transaction starts |
1787 | * NOTIFY foo; |
1788 | * commit starts |
1789 | * transaction starts |
1790 | * LISTEN foo; |
1791 | * commit starts |
1792 | * commit to clog |
1793 | * commit to clog |
1794 | * |
1795 | * It could happen that backend 2 sees the notification from backend 1 in |
1796 | * the queue. Even though the notifying transaction committed before |
1797 | * the listening transaction, we still deliver the notification. |
1798 | * |
1799 | * The idea is that an additional notification does not do any harm, we |
1800 | * just need to make sure that we do not miss a notification. |
1801 | * |
1802 | * It is possible that we fail while trying to send a message to our |
1803 | * frontend (for example, because of encoding conversion failure). |
1804 | * If that happens it is critical that we not try to send the same |
1805 | * message over and over again. Therefore, we place a PG_TRY block |
1806 | * here that will forcibly advance our backend position before we lose |
1807 | * control to an error. (We could alternatively retake AsyncQueueLock |
1808 | * and move the position before handling each individual message, but |
1809 | * that seems like too much lock traffic.) |
1810 | *---------- |
1811 | */ |
1812 | PG_TRY(); |
1813 | { |
1814 | bool reachedStop; |
1815 | |
1816 | do |
1817 | { |
1818 | int curpage = QUEUE_POS_PAGE(pos); |
1819 | int curoffset = QUEUE_POS_OFFSET(pos); |
1820 | int slotno; |
1821 | int copysize; |
1822 | |
1823 | /* |
1824 | * We copy the data from SLRU into a local buffer, so as to avoid |
1825 | * holding the AsyncCtlLock while we are examining the entries and |
1826 | * possibly transmitting them to our frontend. Copy only the part |
1827 | * of the page we will actually inspect. |
1828 | */ |
1829 | slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage, |
1830 | InvalidTransactionId); |
1831 | if (curpage == QUEUE_POS_PAGE(head)) |
1832 | { |
1833 | /* we only want to read as far as head */ |
1834 | copysize = QUEUE_POS_OFFSET(head) - curoffset; |
1835 | if (copysize < 0) |
1836 | copysize = 0; /* just for safety */ |
1837 | } |
1838 | else |
1839 | { |
1840 | /* fetch all the rest of the page */ |
1841 | copysize = QUEUE_PAGESIZE - curoffset; |
1842 | } |
1843 | memcpy(page_buffer.buf + curoffset, |
1844 | AsyncCtl->shared->page_buffer[slotno] + curoffset, |
1845 | copysize); |
1846 | /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ |
1847 | LWLockRelease(AsyncCtlLock); |
1848 | |
1849 | /* |
1850 | * Process messages up to the stop position, end of page, or an |
1851 | * uncommitted message. |
1852 | * |
1853 | * Our stop position is what we found to be the head's position |
1854 | * when we entered this function. It might have changed already. |
1855 | * But if it has, we will receive (or have already received and |
1856 | * queued) another signal and come here again. |
1857 | * |
1858 | * We are not holding AsyncQueueLock here! The queue can only |
1859 | * extend beyond the head pointer (see above) and we leave our |
1860 | * backend's pointer where it is so nobody will truncate or |
1861 | * rewrite pages under us. Especially we don't want to hold a lock |
1862 | * while sending the notifications to the frontend. |
1863 | */ |
1864 | reachedStop = asyncQueueProcessPageEntries(&pos, head, |
1865 | page_buffer.buf, |
1866 | snapshot); |
1867 | } while (!reachedStop); |
1868 | } |
1869 | PG_CATCH(); |
1870 | { |
1871 | /* Update shared state */ |
1872 | LWLockAcquire(AsyncQueueLock, LW_SHARED); |
1873 | QUEUE_BACKEND_POS(MyBackendId) = pos; |
1874 | advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); |
1875 | LWLockRelease(AsyncQueueLock); |
1876 | |
1877 | /* If we were the laziest backend, try to advance the tail pointer */ |
1878 | if (advanceTail) |
1879 | asyncQueueAdvanceTail(); |
1880 | |
1881 | PG_RE_THROW(); |
1882 | } |
1883 | PG_END_TRY(); |
1884 | |
1885 | /* Update shared state */ |
1886 | LWLockAcquire(AsyncQueueLock, LW_SHARED); |
1887 | QUEUE_BACKEND_POS(MyBackendId) = pos; |
1888 | advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL); |
1889 | LWLockRelease(AsyncQueueLock); |
1890 | |
1891 | /* If we were the laziest backend, try to advance the tail pointer */ |
1892 | if (advanceTail) |
1893 | asyncQueueAdvanceTail(); |
1894 | |
1895 | /* Done with snapshot */ |
1896 | UnregisterSnapshot(snapshot); |
1897 | } |
1898 | |
1899 | /* |
1900 | * Fetch notifications from the shared queue, beginning at position current, |
1901 | * and deliver relevant ones to my frontend. |
1902 | * |
1903 | * The current page must have been fetched into page_buffer from shared |
1904 | * memory. (We could access the page right in shared memory, but that |
1905 | * would imply holding the AsyncCtlLock throughout this routine.) |
1906 | * |
1907 | * We stop if we reach the "stop" position, or reach a notification from an |
1908 | * uncommitted transaction, or reach the end of the page. |
1909 | * |
1910 | * The function returns true once we have reached the stop position or an |
1911 | * uncommitted notification, and false if we have finished with the page. |
1912 | * In other words: once it returns true there is no need to look further. |
1913 | * The QueuePosition *current is advanced past all processed messages. |
1914 | */ |
1915 | static bool |
1916 | asyncQueueProcessPageEntries(volatile QueuePosition *current, |
1917 | QueuePosition stop, |
1918 | char *page_buffer, |
1919 | Snapshot snapshot) |
1920 | { |
1921 | bool reachedStop = false; |
1922 | bool reachedEndOfPage; |
1923 | AsyncQueueEntry *qe; |
1924 | |
1925 | do |
1926 | { |
1927 | QueuePosition thisentry = *current; |
1928 | |
1929 | if (QUEUE_POS_EQUAL(thisentry, stop)) |
1930 | break; |
1931 | |
1932 | qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry)); |
1933 | |
1934 | /* |
1935 | * Advance *current over this message, possibly to the next page. As |
1936 | * noted in the comments for asyncQueueReadAllNotifications, we must |
1937 | * do this before possibly failing while processing the message. |
1938 | */ |
1939 | reachedEndOfPage = asyncQueueAdvance(current, qe->length); |
1940 | |
1941 | /* Ignore messages destined for other databases */ |
1942 | if (qe->dboid == MyDatabaseId) |
1943 | { |
1944 | if (XidInMVCCSnapshot(qe->xid, snapshot)) |
1945 | { |
1946 | /* |
1947 | * The source transaction is still in progress, so we can't |
1948 | * process this message yet. Break out of the loop, but first |
1949 | * back up *current so we will reprocess the message next |
1950 | * time. (Note: it is unlikely but not impossible for |
1951 | * TransactionIdDidCommit to fail, so we can't really avoid |
1952 | * this advance-then-back-up behavior when dealing with an |
1953 | * uncommitted message.) |
1954 | * |
1955 | * Note that we must test XidInMVCCSnapshot before we test |
1956 | * TransactionIdDidCommit, else we might return a message from |
1957 | * a transaction that is not yet visible to snapshots; compare |
1958 | * the comments at the head of heapam_visibility.c. |
1959 | * |
1960 | * Also, while our own xact won't be listed in the snapshot, |
1961 | * we need not check for TransactionIdIsCurrentTransactionId |
1962 | * because our transaction cannot (yet) have queued any |
1963 | * messages. |
1964 | */ |
1965 | *current = thisentry; |
1966 | reachedStop = true; |
1967 | break; |
1968 | } |
1969 | else if (TransactionIdDidCommit(qe->xid)) |
1970 | { |
1971 | /* qe->data is the null-terminated channel name */ |
1972 | char *channel = qe->data; |
1973 | |
1974 | if (IsListeningOn(channel)) |
1975 | { |
1976 | /* payload follows channel name */ |
1977 | char *payload = qe->data + strlen(channel) + 1; |
1978 | |
1979 | NotifyMyFrontEnd(channel, payload, qe->srcPid); |
1980 | } |
1981 | } |
1982 | else |
1983 | { |
1984 | /* |
1985 | * The source transaction aborted or crashed, so we just |
1986 | * ignore its notifications. |
1987 | */ |
1988 | } |
1989 | } |
1990 | |
1991 | /* Loop back if we're not at end of page */ |
1992 | } while (!reachedEndOfPage); |
1993 | |
1994 | if (QUEUE_POS_EQUAL(*current, stop)) |
1995 | reachedStop = true; |
1996 | |
1997 | return reachedStop; |
1998 | } |
1999 | |
2000 | /* |
2001 | * Advance the shared queue tail variable to the minimum of all the |
2002 | * per-backend tail pointers. Truncate pg_notify space if possible. |
2003 | */ |
2004 | static void |
2005 | asyncQueueAdvanceTail(void) |
2006 | { |
2007 | QueuePosition min; |
2008 | int i; |
2009 | int oldtailpage; |
2010 | int newtailpage; |
2011 | int boundary; |
2012 | |
2013 | LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); |
2014 | min = QUEUE_HEAD; |
2015 | for (i = 1; i <= MaxBackends; i++) |
2016 | { |
2017 | if (QUEUE_BACKEND_PID(i) != InvalidPid) |
2018 | min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); |
2019 | } |
2020 | oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL); |
2021 | QUEUE_TAIL = min; |
2022 | LWLockRelease(AsyncQueueLock); |
2023 | |
2024 | /* |
2025 | * We can truncate something if the global tail advanced across an SLRU |
2026 | * segment boundary. |
2027 | * |
2028 | * XXX it might be better to truncate only once every several segments, to |
2029 | * reduce the number of directory scans. |
2030 | */ |
2031 | newtailpage = QUEUE_POS_PAGE(min); |
2032 | boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT); |
2033 | if (asyncQueuePagePrecedes(oldtailpage, boundary)) |
2034 | { |
2035 | /* |
2036 | * SimpleLruTruncate() will ask for AsyncCtlLock but will also release |
2037 | * the lock again. |
2038 | */ |
2039 | SimpleLruTruncate(AsyncCtl, newtailpage); |
2040 | } |
2041 | } |
2042 | |
2043 | /* |
2044 | * ProcessIncomingNotify |
2045 | * |
2046 | * Deal with arriving NOTIFYs from other backends as soon as it's safe to |
2047 | * do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT |
2048 | * signal handler, but isn't anymore. |
2049 | * |
2050 | * Scan the queue for arriving notifications and report them to my front |
2051 | * end. |
2052 | * |
2053 | * NOTE: since we are outside any transaction, we must create our own. |
2054 | */ |
2055 | static void |
2056 | ProcessIncomingNotify(void) |
2057 | { |
2058 | /* We *must* reset the flag */ |
2059 | notifyInterruptPending = false; |
2060 | |
2061 | /* Do nothing else if we aren't actively listening */ |
2062 | if (listenChannels == NIL) |
2063 | return; |
2064 | |
2065 | if (Trace_notify) |
2066 | elog(DEBUG1, "ProcessIncomingNotify" ); |
2067 | |
2068 | set_ps_display("notify interrupt" , false); |
2069 | |
2070 | /* |
2071 | * We must run asyncQueueReadAllNotifications inside a transaction, else |
2072 | * bad things happen if it gets an error. |
2073 | */ |
2074 | StartTransactionCommand(); |
2075 | |
2076 | asyncQueueReadAllNotifications(); |
2077 | |
2078 | CommitTransactionCommand(); |
2079 | |
2080 | /* |
2081 | * Must flush the notify messages to ensure frontend gets them promptly. |
2082 | */ |
2083 | pq_flush(); |
2084 | |
2085 | set_ps_display("idle" , false); |
2086 | |
2087 | if (Trace_notify) |
2088 | elog(DEBUG1, "ProcessIncomingNotify: done" ); |
2089 | } |
2090 | |
2091 | /* |
2092 | * Send NOTIFY message to my front end. |
2093 | */ |
2094 | void |
2095 | NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) |
2096 | { |
2097 | if (whereToSendOutput == DestRemote) |
2098 | { |
2099 | StringInfoData buf; |
2100 | |
2101 | pq_beginmessage(&buf, 'A'); |
2102 | pq_sendint32(&buf, srcPid); |
2103 | pq_sendstring(&buf, channel); |
2104 | if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) |
2105 | pq_sendstring(&buf, payload); |
2106 | pq_endmessage(&buf); |
2107 | |
2108 | /* |
2109 | * NOTE: we do not do pq_flush() here. For a self-notify, it will |
2110 | * happen at the end of the transaction, and for incoming notifies |
2111 | * ProcessIncomingNotify will do it after finding all the notifies. |
2112 | */ |
2113 | } |
2114 | else |
2115 | elog(INFO, "NOTIFY for \"%s\" payload \"%s\"" , channel, payload); |
2116 | } |
2117 | |
2118 | /* Does pendingNotifies include the given channel/payload? */ |
2119 | static bool |
2120 | AsyncExistsPendingNotify(const char *channel, const char *payload) |
2121 | { |
2122 | ListCell *p; |
2123 | Notification *n; |
2124 | |
2125 | if (pendingNotifies == NIL) |
2126 | return false; |
2127 | |
2128 | if (payload == NULL) |
2129 | payload = "" ; |
2130 | |
2131 | /*---------- |
2132 | * We need to append new elements to the end of the list in order to keep |
2133 | * the order. However, on the other hand we'd like to check the list |
2134 | * backwards in order to make duplicate-elimination a tad faster when the |
2135 | * same condition is signaled many times in a row. So as a compromise we |
2136 | * check the tail element first which we can access directly. If this |
2137 | * doesn't match, we check the whole list. |
2138 | * |
2139 | * As we are not checking our parents' lists, we can still get duplicates |
2140 | * in combination with subtransactions, like in: |
2141 | * |
2142 | * begin; |
2143 | * notify foo '1'; |
2144 | * savepoint foo; |
2145 | * notify foo '1'; |
2146 | * commit; |
2147 | *---------- |
2148 | */ |
2149 | n = (Notification *) llast(pendingNotifies); |
2150 | if (strcmp(n->channel, channel) == 0 && |
2151 | strcmp(n->payload, payload) == 0) |
2152 | return true; |
2153 | |
2154 | foreach(p, pendingNotifies) |
2155 | { |
2156 | n = (Notification *) lfirst(p); |
2157 | |
2158 | if (strcmp(n->channel, channel) == 0 && |
2159 | strcmp(n->payload, payload) == 0) |
2160 | return true; |
2161 | } |
2162 | |
2163 | return false; |
2164 | } |
2165 | |
2166 | /* Clear the pendingActions and pendingNotifies lists. */ |
2167 | static void |
2168 | ClearPendingActionsAndNotifies(void) |
2169 | { |
2170 | /* |
2171 | * We used to have to explicitly deallocate the list members and nodes, |
2172 | * because they were malloc'd. Now, since we know they are palloc'd in |
2173 | * CurTransactionContext, we need not do that --- they'll go away |
2174 | * automatically at transaction exit. We need only reset the list head |
2175 | * pointers. |
2176 | */ |
2177 | pendingActions = NIL; |
2178 | pendingNotifies = NIL; |
2179 | } |
2180 | |