1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * sinval.c |
4 | * POSTGRES shared cache invalidation communication code. |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * |
10 | * IDENTIFICATION |
11 | * src/backend/storage/ipc/sinval.c |
12 | * |
13 | *------------------------------------------------------------------------- |
14 | */ |
15 | #include "postgres.h" |
16 | |
17 | #include "access/xact.h" |
18 | #include "commands/async.h" |
19 | #include "miscadmin.h" |
20 | #include "storage/ipc.h" |
21 | #include "storage/proc.h" |
22 | #include "storage/sinvaladt.h" |
23 | #include "utils/inval.h" |
24 | |
25 | |
26 | uint64 SharedInvalidMessageCounter; |
27 | |
28 | |
29 | /* |
30 | * Because backends sitting idle will not be reading sinval events, we |
31 | * need a way to give an idle backend a swift kick in the rear and make |
32 | * it catch up before the sinval queue overflows and forces it to go |
33 | * through a cache reset exercise. This is done by sending |
34 | * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind. |
35 | * |
36 | * The signal handler will set an interrupt pending flag and will set the |
37 | * processes latch. Whenever starting to read from the client, or when |
38 | * interrupted while doing so, ProcessClientReadInterrupt() will call |
39 | * ProcessCatchupEvent(). |
40 | */ |
41 | volatile sig_atomic_t catchupInterruptPending = false; |
42 | |
43 | |
44 | /* |
45 | * SendSharedInvalidMessages |
46 | * Add shared-cache-invalidation message(s) to the global SI message queue. |
47 | */ |
48 | void |
49 | SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, int n) |
50 | { |
51 | SIInsertDataEntries(msgs, n); |
52 | } |
53 | |
54 | /* |
55 | * ReceiveSharedInvalidMessages |
56 | * Process shared-cache-invalidation messages waiting for this backend |
57 | * |
58 | * We guarantee to process all messages that had been queued before the |
59 | * routine was entered. It is of course possible for more messages to get |
60 | * queued right after our last SIGetDataEntries call. |
61 | * |
62 | * NOTE: it is entirely possible for this routine to be invoked recursively |
63 | * as a consequence of processing inside the invalFunction or resetFunction. |
64 | * Furthermore, such a recursive call must guarantee that all outstanding |
65 | * inval messages have been processed before it exits. This is the reason |
66 | * for the strange-looking choice to use a statically allocated buffer array |
67 | * and counters; it's so that a recursive call can process messages already |
68 | * sucked out of sinvaladt.c. |
69 | */ |
70 | void |
71 | ReceiveSharedInvalidMessages(void (*invalFunction) (SharedInvalidationMessage *msg), |
72 | void (*resetFunction) (void)) |
73 | { |
74 | #define MAXINVALMSGS 32 |
75 | static SharedInvalidationMessage messages[MAXINVALMSGS]; |
76 | |
77 | /* |
78 | * We use volatile here to prevent bugs if a compiler doesn't realize that |
79 | * recursion is a possibility ... |
80 | */ |
81 | static volatile int nextmsg = 0; |
82 | static volatile int nummsgs = 0; |
83 | |
84 | /* Deal with any messages still pending from an outer recursion */ |
85 | while (nextmsg < nummsgs) |
86 | { |
87 | SharedInvalidationMessage msg = messages[nextmsg++]; |
88 | |
89 | SharedInvalidMessageCounter++; |
90 | invalFunction(&msg); |
91 | } |
92 | |
93 | do |
94 | { |
95 | int getResult; |
96 | |
97 | nextmsg = nummsgs = 0; |
98 | |
99 | /* Try to get some more messages */ |
100 | getResult = SIGetDataEntries(messages, MAXINVALMSGS); |
101 | |
102 | if (getResult < 0) |
103 | { |
104 | /* got a reset message */ |
105 | elog(DEBUG4, "cache state reset" ); |
106 | SharedInvalidMessageCounter++; |
107 | resetFunction(); |
108 | break; /* nothing more to do */ |
109 | } |
110 | |
111 | /* Process them, being wary that a recursive call might eat some */ |
112 | nextmsg = 0; |
113 | nummsgs = getResult; |
114 | |
115 | while (nextmsg < nummsgs) |
116 | { |
117 | SharedInvalidationMessage msg = messages[nextmsg++]; |
118 | |
119 | SharedInvalidMessageCounter++; |
120 | invalFunction(&msg); |
121 | } |
122 | |
123 | /* |
124 | * We only need to loop if the last SIGetDataEntries call (which might |
125 | * have been within a recursive call) returned a full buffer. |
126 | */ |
127 | } while (nummsgs == MAXINVALMSGS); |
128 | |
129 | /* |
130 | * We are now caught up. If we received a catchup signal, reset that |
131 | * flag, and call SICleanupQueue(). This is not so much because we need |
132 | * to flush dead messages right now, as that we want to pass on the |
133 | * catchup signal to the next slowest backend. "Daisy chaining" the |
134 | * catchup signal this way avoids creating spikes in system load for what |
135 | * should be just a background maintenance activity. |
136 | */ |
137 | if (catchupInterruptPending) |
138 | { |
139 | catchupInterruptPending = false; |
140 | elog(DEBUG4, "sinval catchup complete, cleaning queue" ); |
141 | SICleanupQueue(false, 0); |
142 | } |
143 | } |
144 | |
145 | |
146 | /* |
147 | * HandleCatchupInterrupt |
148 | * |
149 | * This is called when PROCSIG_CATCHUP_INTERRUPT is received. |
150 | * |
151 | * We used to directly call ProcessCatchupEvent directly when idle. These days |
152 | * we just set a flag to do it later and notify the process of that fact by |
153 | * setting the process's latch. |
154 | */ |
155 | void |
156 | HandleCatchupInterrupt(void) |
157 | { |
158 | /* |
159 | * Note: this is called by a SIGNAL HANDLER. You must be very wary what |
160 | * you do here. |
161 | */ |
162 | |
163 | catchupInterruptPending = true; |
164 | |
165 | /* make sure the event is processed in due course */ |
166 | SetLatch(MyLatch); |
167 | } |
168 | |
169 | /* |
170 | * ProcessCatchupInterrupt |
171 | * |
172 | * The portion of catchup interrupt handling that runs outside of the signal |
173 | * handler, which allows it to actually process pending invalidations. |
174 | */ |
175 | void |
176 | ProcessCatchupInterrupt(void) |
177 | { |
178 | while (catchupInterruptPending) |
179 | { |
180 | /* |
181 | * What we need to do here is cause ReceiveSharedInvalidMessages() to |
182 | * run, which will do the necessary work and also reset the |
183 | * catchupInterruptPending flag. If we are inside a transaction we |
184 | * can just call AcceptInvalidationMessages() to do this. If we |
185 | * aren't, we start and immediately end a transaction; the call to |
186 | * AcceptInvalidationMessages() happens down inside transaction start. |
187 | * |
188 | * It is awfully tempting to just call AcceptInvalidationMessages() |
189 | * without the rest of the xact start/stop overhead, and I think that |
190 | * would actually work in the normal case; but I am not sure that |
191 | * things would clean up nicely if we got an error partway through. |
192 | */ |
193 | if (IsTransactionOrTransactionBlock()) |
194 | { |
195 | elog(DEBUG4, "ProcessCatchupEvent inside transaction" ); |
196 | AcceptInvalidationMessages(); |
197 | } |
198 | else |
199 | { |
200 | elog(DEBUG4, "ProcessCatchupEvent outside transaction" ); |
201 | StartTransactionCommand(); |
202 | CommitTransactionCommand(); |
203 | } |
204 | } |
205 | } |
206 | |