| 1 | /*------------------------------------------------------------------------- |
| 2 | * |
| 3 | * pqmq.c |
| 4 | * Use the frontend/backend protocol for communication over a shm_mq |
| 5 | * |
| 6 | * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group |
| 7 | * Portions Copyright (c) 1994, Regents of the University of California |
| 8 | * |
| 9 | * src/backend/libpq/pqmq.c |
| 10 | * |
| 11 | *------------------------------------------------------------------------- |
| 12 | */ |
| 13 | |
| 14 | #include "postgres.h" |
| 15 | |
| 16 | #include "libpq/libpq.h" |
| 17 | #include "libpq/pqformat.h" |
| 18 | #include "libpq/pqmq.h" |
| 19 | #include "miscadmin.h" |
| 20 | #include "pgstat.h" |
| 21 | #include "tcop/tcopprot.h" |
| 22 | #include "utils/builtins.h" |
| 23 | |
| 24 | static shm_mq_handle *pq_mq_handle; |
| 25 | static bool pq_mq_busy = false; |
| 26 | static pid_t pq_mq_parallel_master_pid = 0; |
| 27 | static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId; |
| 28 | |
| 29 | static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg); |
| 30 | static void mq_comm_reset(void); |
| 31 | static int mq_flush(void); |
| 32 | static int mq_flush_if_writable(void); |
| 33 | static bool mq_is_send_pending(void); |
| 34 | static int mq_putmessage(char msgtype, const char *s, size_t len); |
| 35 | static void mq_putmessage_noblock(char msgtype, const char *s, size_t len); |
| 36 | static void mq_startcopyout(void); |
| 37 | static void mq_endcopyout(bool errorAbort); |
| 38 | |
| 39 | static const PQcommMethods PqCommMqMethods = { |
| 40 | mq_comm_reset, |
| 41 | mq_flush, |
| 42 | mq_flush_if_writable, |
| 43 | mq_is_send_pending, |
| 44 | mq_putmessage, |
| 45 | mq_putmessage_noblock, |
| 46 | mq_startcopyout, |
| 47 | mq_endcopyout |
| 48 | }; |
| 49 | |
| 50 | /* |
| 51 | * Arrange to redirect frontend/backend protocol messages to a shared-memory |
| 52 | * message queue. |
| 53 | */ |
| 54 | void |
| 55 | pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh) |
| 56 | { |
| 57 | PqCommMethods = &PqCommMqMethods; |
| 58 | pq_mq_handle = mqh; |
| 59 | whereToSendOutput = DestRemote; |
| 60 | FrontendProtocol = PG_PROTOCOL_LATEST; |
| 61 | on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0); |
| 62 | } |
| 63 | |
| 64 | /* |
| 65 | * When the DSM that contains our shm_mq goes away, we need to stop sending |
| 66 | * messages to it. |
| 67 | */ |
| 68 | static void |
| 69 | pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg) |
| 70 | { |
| 71 | pq_mq_handle = NULL; |
| 72 | whereToSendOutput = DestNone; |
| 73 | } |
| 74 | |
| 75 | /* |
| 76 | * Arrange to SendProcSignal() to the parallel master each time we transmit |
| 77 | * message data via the shm_mq. |
| 78 | */ |
| 79 | void |
| 80 | pq_set_parallel_master(pid_t pid, BackendId backend_id) |
| 81 | { |
| 82 | Assert(PqCommMethods == &PqCommMqMethods); |
| 83 | pq_mq_parallel_master_pid = pid; |
| 84 | pq_mq_parallel_master_backend_id = backend_id; |
| 85 | } |
| 86 | |
| 87 | static void |
| 88 | mq_comm_reset(void) |
| 89 | { |
| 90 | /* Nothing to do. */ |
| 91 | } |
| 92 | |
| 93 | static int |
| 94 | mq_flush(void) |
| 95 | { |
| 96 | /* Nothing to do. */ |
| 97 | return 0; |
| 98 | } |
| 99 | |
| 100 | static int |
| 101 | mq_flush_if_writable(void) |
| 102 | { |
| 103 | /* Nothing to do. */ |
| 104 | return 0; |
| 105 | } |
| 106 | |
| 107 | static bool |
| 108 | mq_is_send_pending(void) |
| 109 | { |
| 110 | /* There's never anything pending. */ |
| 111 | return 0; |
| 112 | } |
| 113 | |
| 114 | /* |
| 115 | * Transmit a libpq protocol message to the shared memory message queue |
| 116 | * selected via pq_mq_handle. We don't include a length word, because the |
| 117 | * receiver will know the length of the message from shm_mq_receive(). |
| 118 | */ |
| 119 | static int |
| 120 | mq_putmessage(char msgtype, const char *s, size_t len) |
| 121 | { |
| 122 | shm_mq_iovec iov[2]; |
| 123 | shm_mq_result result; |
| 124 | |
| 125 | /* |
| 126 | * If we're sending a message, and we have to wait because the queue is |
| 127 | * full, and then we get interrupted, and that interrupt results in trying |
| 128 | * to send another message, we respond by detaching the queue. There's no |
| 129 | * way to return to the original context, but even if there were, just |
| 130 | * queueing the message would amount to indefinitely postponing the |
| 131 | * response to the interrupt. So we do this instead. |
| 132 | */ |
| 133 | if (pq_mq_busy) |
| 134 | { |
| 135 | if (pq_mq_handle != NULL) |
| 136 | shm_mq_detach(pq_mq_handle); |
| 137 | pq_mq_handle = NULL; |
| 138 | return EOF; |
| 139 | } |
| 140 | |
| 141 | /* |
| 142 | * If the message queue is already gone, just ignore the message. This |
| 143 | * doesn't necessarily indicate a problem; for example, DEBUG messages can |
| 144 | * be generated late in the shutdown sequence, after all DSMs have already |
| 145 | * been detached. |
| 146 | */ |
| 147 | if (pq_mq_handle == NULL) |
| 148 | return 0; |
| 149 | |
| 150 | pq_mq_busy = true; |
| 151 | |
| 152 | iov[0].data = &msgtype; |
| 153 | iov[0].len = 1; |
| 154 | iov[1].data = s; |
| 155 | iov[1].len = len; |
| 156 | |
| 157 | Assert(pq_mq_handle != NULL); |
| 158 | |
| 159 | for (;;) |
| 160 | { |
| 161 | result = shm_mq_sendv(pq_mq_handle, iov, 2, true); |
| 162 | |
| 163 | if (pq_mq_parallel_master_pid != 0) |
| 164 | SendProcSignal(pq_mq_parallel_master_pid, |
| 165 | PROCSIG_PARALLEL_MESSAGE, |
| 166 | pq_mq_parallel_master_backend_id); |
| 167 | |
| 168 | if (result != SHM_MQ_WOULD_BLOCK) |
| 169 | break; |
| 170 | |
| 171 | (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
| 172 | WAIT_EVENT_MQ_PUT_MESSAGE); |
| 173 | ResetLatch(MyLatch); |
| 174 | CHECK_FOR_INTERRUPTS(); |
| 175 | } |
| 176 | |
| 177 | pq_mq_busy = false; |
| 178 | |
| 179 | Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED); |
| 180 | if (result != SHM_MQ_SUCCESS) |
| 181 | return EOF; |
| 182 | return 0; |
| 183 | } |
| 184 | |
| 185 | static void |
| 186 | mq_putmessage_noblock(char msgtype, const char *s, size_t len) |
| 187 | { |
| 188 | /* |
| 189 | * While the shm_mq machinery does support sending a message in |
| 190 | * non-blocking mode, there's currently no way to try sending beginning to |
| 191 | * send the message that doesn't also commit us to completing the |
| 192 | * transmission. This could be improved in the future, but for now we |
| 193 | * don't need it. |
| 194 | */ |
| 195 | elog(ERROR, "not currently supported" ); |
| 196 | } |
| 197 | |
| 198 | static void |
| 199 | mq_startcopyout(void) |
| 200 | { |
| 201 | /* Nothing to do. */ |
| 202 | } |
| 203 | |
| 204 | static void |
| 205 | mq_endcopyout(bool errorAbort) |
| 206 | { |
| 207 | /* Nothing to do. */ |
| 208 | } |
| 209 | |
| 210 | /* |
| 211 | * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData |
| 212 | * structure with the results. |
| 213 | */ |
| 214 | void |
| 215 | pq_parse_errornotice(StringInfo msg, ErrorData *edata) |
| 216 | { |
| 217 | /* Initialize edata with reasonable defaults. */ |
| 218 | MemSet(edata, 0, sizeof(ErrorData)); |
| 219 | edata->elevel = ERROR; |
| 220 | edata->assoc_context = CurrentMemoryContext; |
| 221 | |
| 222 | /* Loop over fields and extract each one. */ |
| 223 | for (;;) |
| 224 | { |
| 225 | char code = pq_getmsgbyte(msg); |
| 226 | const char *value; |
| 227 | |
| 228 | if (code == '\0') |
| 229 | { |
| 230 | pq_getmsgend(msg); |
| 231 | break; |
| 232 | } |
| 233 | value = pq_getmsgrawstring(msg); |
| 234 | |
| 235 | switch (code) |
| 236 | { |
| 237 | case PG_DIAG_SEVERITY: |
| 238 | /* ignore, trusting we'll get a nonlocalized version */ |
| 239 | break; |
| 240 | case PG_DIAG_SEVERITY_NONLOCALIZED: |
| 241 | if (strcmp(value, "DEBUG" ) == 0) |
| 242 | { |
| 243 | /* |
| 244 | * We can't reconstruct the exact DEBUG level, but |
| 245 | * presumably it was >= client_min_messages, so select |
| 246 | * DEBUG1 to ensure we'll pass it on to the client. |
| 247 | */ |
| 248 | edata->elevel = DEBUG1; |
| 249 | } |
| 250 | else if (strcmp(value, "LOG" ) == 0) |
| 251 | { |
| 252 | /* |
| 253 | * It can't be LOG_SERVER_ONLY, or the worker wouldn't |
| 254 | * have sent it to us; so LOG is the correct value. |
| 255 | */ |
| 256 | edata->elevel = LOG; |
| 257 | } |
| 258 | else if (strcmp(value, "INFO" ) == 0) |
| 259 | edata->elevel = INFO; |
| 260 | else if (strcmp(value, "NOTICE" ) == 0) |
| 261 | edata->elevel = NOTICE; |
| 262 | else if (strcmp(value, "WARNING" ) == 0) |
| 263 | edata->elevel = WARNING; |
| 264 | else if (strcmp(value, "ERROR" ) == 0) |
| 265 | edata->elevel = ERROR; |
| 266 | else if (strcmp(value, "FATAL" ) == 0) |
| 267 | edata->elevel = FATAL; |
| 268 | else if (strcmp(value, "PANIC" ) == 0) |
| 269 | edata->elevel = PANIC; |
| 270 | else |
| 271 | elog(ERROR, "unrecognized error severity: \"%s\"" , value); |
| 272 | break; |
| 273 | case PG_DIAG_SQLSTATE: |
| 274 | if (strlen(value) != 5) |
| 275 | elog(ERROR, "invalid SQLSTATE: \"%s\"" , value); |
| 276 | edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2], |
| 277 | value[3], value[4]); |
| 278 | break; |
| 279 | case PG_DIAG_MESSAGE_PRIMARY: |
| 280 | edata->message = pstrdup(value); |
| 281 | break; |
| 282 | case PG_DIAG_MESSAGE_DETAIL: |
| 283 | edata->detail = pstrdup(value); |
| 284 | break; |
| 285 | case PG_DIAG_MESSAGE_HINT: |
| 286 | edata->hint = pstrdup(value); |
| 287 | break; |
| 288 | case PG_DIAG_STATEMENT_POSITION: |
| 289 | edata->cursorpos = pg_strtoint32(value); |
| 290 | break; |
| 291 | case PG_DIAG_INTERNAL_POSITION: |
| 292 | edata->internalpos = pg_strtoint32(value); |
| 293 | break; |
| 294 | case PG_DIAG_INTERNAL_QUERY: |
| 295 | edata->internalquery = pstrdup(value); |
| 296 | break; |
| 297 | case PG_DIAG_CONTEXT: |
| 298 | edata->context = pstrdup(value); |
| 299 | break; |
| 300 | case PG_DIAG_SCHEMA_NAME: |
| 301 | edata->schema_name = pstrdup(value); |
| 302 | break; |
| 303 | case PG_DIAG_TABLE_NAME: |
| 304 | edata->table_name = pstrdup(value); |
| 305 | break; |
| 306 | case PG_DIAG_COLUMN_NAME: |
| 307 | edata->column_name = pstrdup(value); |
| 308 | break; |
| 309 | case PG_DIAG_DATATYPE_NAME: |
| 310 | edata->datatype_name = pstrdup(value); |
| 311 | break; |
| 312 | case PG_DIAG_CONSTRAINT_NAME: |
| 313 | edata->constraint_name = pstrdup(value); |
| 314 | break; |
| 315 | case PG_DIAG_SOURCE_FILE: |
| 316 | edata->filename = pstrdup(value); |
| 317 | break; |
| 318 | case PG_DIAG_SOURCE_LINE: |
| 319 | edata->lineno = pg_strtoint32(value); |
| 320 | break; |
| 321 | case PG_DIAG_SOURCE_FUNCTION: |
| 322 | edata->funcname = pstrdup(value); |
| 323 | break; |
| 324 | default: |
| 325 | elog(ERROR, "unrecognized error field code: %d" , (int) code); |
| 326 | break; |
| 327 | } |
| 328 | } |
| 329 | } |
| 330 | |