1 | /*------------------------------------------------------------------------- |
2 | * |
3 | * pqmq.c |
4 | * Use the frontend/backend protocol for communication over a shm_mq |
5 | * |
6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
7 | * Portions Copyright (c) 1994, Regents of the University of California |
8 | * |
9 | * src/backend/libpq/pqmq.c |
10 | * |
11 | *------------------------------------------------------------------------- |
12 | */ |
13 | |
14 | #include "postgres.h" |
15 | |
16 | #include "libpq/libpq.h" |
17 | #include "libpq/pqformat.h" |
18 | #include "libpq/pqmq.h" |
19 | #include "miscadmin.h" |
20 | #include "pgstat.h" |
21 | #include "tcop/tcopprot.h" |
22 | #include "utils/builtins.h" |
23 | |
24 | static shm_mq_handle *pq_mq_handle; |
25 | static bool pq_mq_busy = false; |
26 | static pid_t pq_mq_parallel_master_pid = 0; |
27 | static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId; |
28 | |
29 | static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg); |
30 | static void mq_comm_reset(void); |
31 | static int mq_flush(void); |
32 | static int mq_flush_if_writable(void); |
33 | static bool mq_is_send_pending(void); |
34 | static int mq_putmessage(char msgtype, const char *s, size_t len); |
35 | static void mq_putmessage_noblock(char msgtype, const char *s, size_t len); |
36 | static void mq_startcopyout(void); |
37 | static void mq_endcopyout(bool errorAbort); |
38 | |
39 | static const PQcommMethods PqCommMqMethods = { |
40 | mq_comm_reset, |
41 | mq_flush, |
42 | mq_flush_if_writable, |
43 | mq_is_send_pending, |
44 | mq_putmessage, |
45 | mq_putmessage_noblock, |
46 | mq_startcopyout, |
47 | mq_endcopyout |
48 | }; |
49 | |
50 | /* |
51 | * Arrange to redirect frontend/backend protocol messages to a shared-memory |
52 | * message queue. |
53 | */ |
54 | void |
55 | pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh) |
56 | { |
57 | PqCommMethods = &PqCommMqMethods; |
58 | pq_mq_handle = mqh; |
59 | whereToSendOutput = DestRemote; |
60 | FrontendProtocol = PG_PROTOCOL_LATEST; |
61 | on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0); |
62 | } |
63 | |
64 | /* |
65 | * When the DSM that contains our shm_mq goes away, we need to stop sending |
66 | * messages to it. |
67 | */ |
68 | static void |
69 | pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg) |
70 | { |
71 | pq_mq_handle = NULL; |
72 | whereToSendOutput = DestNone; |
73 | } |
74 | |
75 | /* |
76 | * Arrange to SendProcSignal() to the parallel master each time we transmit |
77 | * message data via the shm_mq. |
78 | */ |
79 | void |
80 | pq_set_parallel_master(pid_t pid, BackendId backend_id) |
81 | { |
82 | Assert(PqCommMethods == &PqCommMqMethods); |
83 | pq_mq_parallel_master_pid = pid; |
84 | pq_mq_parallel_master_backend_id = backend_id; |
85 | } |
86 | |
87 | static void |
88 | mq_comm_reset(void) |
89 | { |
90 | /* Nothing to do. */ |
91 | } |
92 | |
93 | static int |
94 | mq_flush(void) |
95 | { |
96 | /* Nothing to do. */ |
97 | return 0; |
98 | } |
99 | |
100 | static int |
101 | mq_flush_if_writable(void) |
102 | { |
103 | /* Nothing to do. */ |
104 | return 0; |
105 | } |
106 | |
107 | static bool |
108 | mq_is_send_pending(void) |
109 | { |
110 | /* There's never anything pending. */ |
111 | return 0; |
112 | } |
113 | |
114 | /* |
115 | * Transmit a libpq protocol message to the shared memory message queue |
116 | * selected via pq_mq_handle. We don't include a length word, because the |
117 | * receiver will know the length of the message from shm_mq_receive(). |
118 | */ |
119 | static int |
120 | mq_putmessage(char msgtype, const char *s, size_t len) |
121 | { |
122 | shm_mq_iovec iov[2]; |
123 | shm_mq_result result; |
124 | |
125 | /* |
126 | * If we're sending a message, and we have to wait because the queue is |
127 | * full, and then we get interrupted, and that interrupt results in trying |
128 | * to send another message, we respond by detaching the queue. There's no |
129 | * way to return to the original context, but even if there were, just |
130 | * queueing the message would amount to indefinitely postponing the |
131 | * response to the interrupt. So we do this instead. |
132 | */ |
133 | if (pq_mq_busy) |
134 | { |
135 | if (pq_mq_handle != NULL) |
136 | shm_mq_detach(pq_mq_handle); |
137 | pq_mq_handle = NULL; |
138 | return EOF; |
139 | } |
140 | |
141 | /* |
142 | * If the message queue is already gone, just ignore the message. This |
143 | * doesn't necessarily indicate a problem; for example, DEBUG messages can |
144 | * be generated late in the shutdown sequence, after all DSMs have already |
145 | * been detached. |
146 | */ |
147 | if (pq_mq_handle == NULL) |
148 | return 0; |
149 | |
150 | pq_mq_busy = true; |
151 | |
152 | iov[0].data = &msgtype; |
153 | iov[0].len = 1; |
154 | iov[1].data = s; |
155 | iov[1].len = len; |
156 | |
157 | Assert(pq_mq_handle != NULL); |
158 | |
159 | for (;;) |
160 | { |
161 | result = shm_mq_sendv(pq_mq_handle, iov, 2, true); |
162 | |
163 | if (pq_mq_parallel_master_pid != 0) |
164 | SendProcSignal(pq_mq_parallel_master_pid, |
165 | PROCSIG_PARALLEL_MESSAGE, |
166 | pq_mq_parallel_master_backend_id); |
167 | |
168 | if (result != SHM_MQ_WOULD_BLOCK) |
169 | break; |
170 | |
171 | (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
172 | WAIT_EVENT_MQ_PUT_MESSAGE); |
173 | ResetLatch(MyLatch); |
174 | CHECK_FOR_INTERRUPTS(); |
175 | } |
176 | |
177 | pq_mq_busy = false; |
178 | |
179 | Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED); |
180 | if (result != SHM_MQ_SUCCESS) |
181 | return EOF; |
182 | return 0; |
183 | } |
184 | |
185 | static void |
186 | mq_putmessage_noblock(char msgtype, const char *s, size_t len) |
187 | { |
188 | /* |
189 | * While the shm_mq machinery does support sending a message in |
190 | * non-blocking mode, there's currently no way to try sending beginning to |
191 | * send the message that doesn't also commit us to completing the |
192 | * transmission. This could be improved in the future, but for now we |
193 | * don't need it. |
194 | */ |
195 | elog(ERROR, "not currently supported" ); |
196 | } |
197 | |
198 | static void |
199 | mq_startcopyout(void) |
200 | { |
201 | /* Nothing to do. */ |
202 | } |
203 | |
204 | static void |
205 | mq_endcopyout(bool errorAbort) |
206 | { |
207 | /* Nothing to do. */ |
208 | } |
209 | |
210 | /* |
211 | * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData |
212 | * structure with the results. |
213 | */ |
214 | void |
215 | pq_parse_errornotice(StringInfo msg, ErrorData *edata) |
216 | { |
217 | /* Initialize edata with reasonable defaults. */ |
218 | MemSet(edata, 0, sizeof(ErrorData)); |
219 | edata->elevel = ERROR; |
220 | edata->assoc_context = CurrentMemoryContext; |
221 | |
222 | /* Loop over fields and extract each one. */ |
223 | for (;;) |
224 | { |
225 | char code = pq_getmsgbyte(msg); |
226 | const char *value; |
227 | |
228 | if (code == '\0') |
229 | { |
230 | pq_getmsgend(msg); |
231 | break; |
232 | } |
233 | value = pq_getmsgrawstring(msg); |
234 | |
235 | switch (code) |
236 | { |
237 | case PG_DIAG_SEVERITY: |
238 | /* ignore, trusting we'll get a nonlocalized version */ |
239 | break; |
240 | case PG_DIAG_SEVERITY_NONLOCALIZED: |
241 | if (strcmp(value, "DEBUG" ) == 0) |
242 | { |
243 | /* |
244 | * We can't reconstruct the exact DEBUG level, but |
245 | * presumably it was >= client_min_messages, so select |
246 | * DEBUG1 to ensure we'll pass it on to the client. |
247 | */ |
248 | edata->elevel = DEBUG1; |
249 | } |
250 | else if (strcmp(value, "LOG" ) == 0) |
251 | { |
252 | /* |
253 | * It can't be LOG_SERVER_ONLY, or the worker wouldn't |
254 | * have sent it to us; so LOG is the correct value. |
255 | */ |
256 | edata->elevel = LOG; |
257 | } |
258 | else if (strcmp(value, "INFO" ) == 0) |
259 | edata->elevel = INFO; |
260 | else if (strcmp(value, "NOTICE" ) == 0) |
261 | edata->elevel = NOTICE; |
262 | else if (strcmp(value, "WARNING" ) == 0) |
263 | edata->elevel = WARNING; |
264 | else if (strcmp(value, "ERROR" ) == 0) |
265 | edata->elevel = ERROR; |
266 | else if (strcmp(value, "FATAL" ) == 0) |
267 | edata->elevel = FATAL; |
268 | else if (strcmp(value, "PANIC" ) == 0) |
269 | edata->elevel = PANIC; |
270 | else |
271 | elog(ERROR, "unrecognized error severity: \"%s\"" , value); |
272 | break; |
273 | case PG_DIAG_SQLSTATE: |
274 | if (strlen(value) != 5) |
275 | elog(ERROR, "invalid SQLSTATE: \"%s\"" , value); |
276 | edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2], |
277 | value[3], value[4]); |
278 | break; |
279 | case PG_DIAG_MESSAGE_PRIMARY: |
280 | edata->message = pstrdup(value); |
281 | break; |
282 | case PG_DIAG_MESSAGE_DETAIL: |
283 | edata->detail = pstrdup(value); |
284 | break; |
285 | case PG_DIAG_MESSAGE_HINT: |
286 | edata->hint = pstrdup(value); |
287 | break; |
288 | case PG_DIAG_STATEMENT_POSITION: |
289 | edata->cursorpos = pg_strtoint32(value); |
290 | break; |
291 | case PG_DIAG_INTERNAL_POSITION: |
292 | edata->internalpos = pg_strtoint32(value); |
293 | break; |
294 | case PG_DIAG_INTERNAL_QUERY: |
295 | edata->internalquery = pstrdup(value); |
296 | break; |
297 | case PG_DIAG_CONTEXT: |
298 | edata->context = pstrdup(value); |
299 | break; |
300 | case PG_DIAG_SCHEMA_NAME: |
301 | edata->schema_name = pstrdup(value); |
302 | break; |
303 | case PG_DIAG_TABLE_NAME: |
304 | edata->table_name = pstrdup(value); |
305 | break; |
306 | case PG_DIAG_COLUMN_NAME: |
307 | edata->column_name = pstrdup(value); |
308 | break; |
309 | case PG_DIAG_DATATYPE_NAME: |
310 | edata->datatype_name = pstrdup(value); |
311 | break; |
312 | case PG_DIAG_CONSTRAINT_NAME: |
313 | edata->constraint_name = pstrdup(value); |
314 | break; |
315 | case PG_DIAG_SOURCE_FILE: |
316 | edata->filename = pstrdup(value); |
317 | break; |
318 | case PG_DIAG_SOURCE_LINE: |
319 | edata->lineno = pg_strtoint32(value); |
320 | break; |
321 | case PG_DIAG_SOURCE_FUNCTION: |
322 | edata->funcname = pstrdup(value); |
323 | break; |
324 | default: |
325 | elog(ERROR, "unrecognized error field code: %d" , (int) code); |
326 | break; |
327 | } |
328 | } |
329 | } |
330 | |