1 | /* |
2 | * thr_info_port.c |
3 | * |
4 | * Copyright (C) 2008-2014 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | #include "base/thr_info_port.h" |
24 | |
25 | #include <errno.h> |
26 | #include <stdbool.h> |
27 | #include <stddef.h> |
28 | #include <stdint.h> |
29 | #include <string.h> |
30 | #include <unistd.h> |
31 | #include <sys/ioctl.h> |
32 | |
33 | #include "citrusleaf/alloc.h" |
34 | #include "citrusleaf/cf_atomic.h" |
35 | |
36 | #include "cf_str.h" |
37 | #include "cf_thread.h" |
38 | #include "dynbuf.h" |
39 | #include "fault.h" |
40 | #include "socket.h" |
41 | |
42 | #include "base/cfg.h" |
43 | #include "base/thr_info.h" |
44 | |
45 | #define POLL_SZ 1024 |
46 | |
47 | // State for any open info port. |
48 | typedef struct { |
49 | int recv_pos; |
50 | int recv_alloc; |
51 | uint8_t *recv_buf; |
52 | |
53 | int xmit_pos; // where we're currently writing |
54 | int xmit_limit; // the end of the write buffer |
55 | int xmit_alloc; |
56 | uint8_t *xmit_buf; |
57 | |
58 | cf_socket sock; |
59 | |
60 | } info_port_state; |
61 | |
62 | cf_serv_cfg g_info_bind = { .n_cfgs = 0 }; |
63 | cf_ip_port g_info_port = 0; |
64 | |
65 | static cf_sockets g_sockets; |
66 | |
67 | // Using int for 4-byte size, but maintaining bool semantics. |
68 | static volatile int g_started = false; |
69 | |
70 | void |
71 | info_port_state_free(info_port_state *ips) |
72 | { |
73 | if (ips->recv_buf) cf_free(ips->recv_buf); |
74 | if (ips->xmit_buf) cf_free(ips->xmit_buf); |
75 | cf_socket_close(&ips->sock); |
76 | cf_socket_term(&ips->sock); |
77 | memset(ips, -1, sizeof(info_port_state)); |
78 | cf_free(ips); |
79 | } |
80 | |
81 | |
82 | int |
83 | thr_info_port_readable(info_port_state *ips) |
84 | { |
85 | int sz = cf_socket_available(&ips->sock); |
86 | |
87 | if (sz == 0) { |
88 | return 0; |
89 | } |
90 | |
91 | // Make sure we've got some reasonable space in the read buffer. |
92 | if (ips->recv_alloc - ips->recv_pos < sz) { |
93 | int new_sz = sz + ips->recv_pos + 100; |
94 | ips->recv_buf = cf_realloc(ips->recv_buf, new_sz); |
95 | ips->recv_alloc = new_sz; |
96 | } |
97 | |
98 | int n = cf_socket_recv(&ips->sock, ips->recv_buf + ips->recv_pos, ips->recv_alloc - ips->recv_pos, 0); |
99 | if (n < 0) { |
100 | if (errno != EAGAIN) { |
101 | cf_detail(AS_INFO_PORT, "info socket: read fail: error: rv %d sz was %d errno %d" , n, ips->recv_alloc - ips->recv_pos, errno); |
102 | } |
103 | return -1; |
104 | } |
105 | ips->recv_pos += n; |
106 | |
107 | // What about a control-c? |
108 | if (-1 != cf_str_strnchr(ips->recv_buf, ips->recv_pos, 0xFF)) { |
109 | cf_debug(AS_INFO_PORT, "recived a control c, aborting" ); |
110 | return -1; |
111 | } |
112 | |
113 | // See if we've got a CR or LF in the buf yet. |
114 | int cr = cf_str_strnchr(ips->recv_buf, ips->recv_pos, '\r'); |
115 | int lf = cf_str_strnchr(ips->recv_buf, ips->recv_pos, '\n'); |
116 | if ((cr >= 0) || (lf >= 0)) { |
117 | size_t len; |
118 | // Take the closest of cr or lf. |
119 | if (-1 == lf) { |
120 | len = cr; |
121 | } |
122 | else if (-1 == cr) { |
123 | len = lf; |
124 | } |
125 | else { |
126 | len = lf < cr ? lf : cr; |
127 | } |
128 | |
129 | // We have a message. Process it. |
130 | cf_dyn_buf_define(db); |
131 | |
132 | ips->recv_buf[len] = '\n'; |
133 | len++; |
134 | |
135 | // Fill out the db buffer with the response (always returns 0). |
136 | as_info_buffer(ips->recv_buf, len, &db); |
137 | if (db.used_sz == 0) cf_dyn_buf_append_char(&db, '\n'); |
138 | |
139 | // See if it has a tab, get that location. It probably does. |
140 | int tab = cf_str_strnchr(db.buf, db.used_sz , '\t'); |
141 | tab++; |
142 | |
143 | while (len < ips->recv_pos && |
144 | ((ips->recv_buf[len] == '\r') || (ips->recv_buf[len] == '\n'))) { |
145 | |
146 | len ++ ; |
147 | } |
148 | |
149 | // Move transmit buffer forward. |
150 | if (ips->recv_pos - len > 0) { |
151 | memmove(ips->recv_buf, ips->recv_buf + len, ips->recv_pos - len); |
152 | ips->recv_pos -= len; |
153 | } |
154 | else { |
155 | ips->recv_pos = 0; |
156 | } |
157 | |
158 | // Queue the response - set to the xmit buf. |
159 | if (ips->xmit_alloc - ips->xmit_limit < db.used_sz) { |
160 | ips->xmit_buf = cf_realloc(ips->xmit_buf, db.used_sz + ips->xmit_limit); |
161 | ips->xmit_alloc = db.used_sz + ips->xmit_limit; |
162 | } |
163 | memcpy(ips->xmit_buf + ips->xmit_limit, db.buf + tab, db.used_sz - tab); |
164 | ips->xmit_limit += db.used_sz - tab; |
165 | |
166 | cf_dyn_buf_free(&db); |
167 | } |
168 | |
169 | return 0; |
170 | } |
171 | |
172 | |
173 | int |
174 | thr_info_port_writable(info_port_state *ips) |
175 | { |
176 | // Do we have bytes to write? |
177 | if (ips->xmit_limit > 0) { |
178 | |
179 | // Write them! |
180 | int rv = cf_socket_send(&ips->sock, ips->xmit_buf + ips->xmit_pos, ips->xmit_limit - ips->xmit_pos , MSG_NOSIGNAL); |
181 | if (rv < 0) { |
182 | if (errno != EAGAIN) { |
183 | return -1; |
184 | } |
185 | } |
186 | else if (rv == 0) { |
187 | cf_debug(AS_INFO_PORT, "send with return value 0" ); |
188 | return 0; |
189 | } |
190 | else { |
191 | ips->xmit_pos += rv; |
192 | if (ips->xmit_pos == ips->xmit_limit) { |
193 | ips->xmit_pos = ips->xmit_limit = 0; |
194 | } |
195 | } |
196 | } |
197 | |
198 | return 0; |
199 | } |
200 | |
201 | |
202 | // Demarshal info socket connections. |
203 | void * |
204 | run_info_port(void *arg) |
205 | { |
206 | cf_poll poll; |
207 | cf_debug(AS_INFO_PORT, "Info port process started" ); |
208 | |
209 | // Start the listener socket. Note that because this is done after privilege |
210 | // de-escalation, we can't use privileged ports. |
211 | |
212 | if (cf_socket_init_server(&g_info_bind, &g_sockets) < 0) { |
213 | cf_crash(AS_INFO_PORT, "Couldn't initialize service sockets" ); |
214 | } |
215 | |
216 | cf_poll_create(&poll); |
217 | cf_poll_add_sockets(poll, &g_sockets, EPOLLIN | EPOLLERR | EPOLLHUP); |
218 | cf_socket_show_server(AS_INFO_PORT, "info" , &g_sockets); |
219 | |
220 | g_started = true; |
221 | |
222 | while (true) { |
223 | cf_poll_event events[POLL_SZ]; |
224 | int32_t n_ev = cf_poll_wait(poll, events, POLL_SZ, -1); |
225 | |
226 | for (int32_t i = 0; i < n_ev; ++i) { |
227 | cf_socket *ssock = events[i].data; |
228 | |
229 | if (cf_sockets_has_socket(&g_sockets, ssock)) { |
230 | cf_socket csock; |
231 | cf_sock_addr addr; |
232 | |
233 | if (cf_socket_accept(ssock, &csock, &addr) < 0) { |
234 | // This means we're out of file descriptors. |
235 | if (errno == EMFILE) { |
236 | cf_warning(AS_INFO_PORT, "Too many file descriptors in use, consider raising limit" ); |
237 | continue; |
238 | } |
239 | |
240 | cf_crash(AS_INFO_PORT, "cf_socket_accept() failed" ); |
241 | } |
242 | |
243 | cf_detail(AS_INFO_PORT, "New connection: %s" , cf_sock_addr_print(&addr)); |
244 | info_port_state *ips = cf_malloc(sizeof(info_port_state)); |
245 | |
246 | ips->recv_pos = 0; |
247 | ips->recv_alloc = 100; |
248 | ips->recv_buf = cf_malloc(100); |
249 | ips->xmit_limit = ips->xmit_pos = 0; |
250 | ips->xmit_alloc = 100; |
251 | ips->xmit_buf = cf_malloc(100); |
252 | cf_socket_copy(&csock, &ips->sock); |
253 | |
254 | cf_poll_add_socket(poll, &csock, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP, ips); |
255 | } |
256 | else { |
257 | info_port_state *ips = events[i].data; |
258 | |
259 | if (ips == NULL) { |
260 | cf_crash(AS_INFO_PORT, "Event with null handle" ); |
261 | } |
262 | |
263 | cf_detail(AS_INFO_PORT, "Events %x on FD %d" , events[i].events, CSFD(&ips->sock)); |
264 | |
265 | if (events[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) { |
266 | cf_detail(AS_INFO_PORT, "Remote close on FD %d" , CSFD(&ips->sock)); |
267 | cf_poll_delete_socket(poll, &ips->sock); |
268 | info_port_state_free(ips); |
269 | continue; |
270 | } |
271 | |
272 | if ((events[i].events & EPOLLIN) != 0 && thr_info_port_readable(ips) < 0) { |
273 | cf_poll_delete_socket(poll, &ips->sock); |
274 | info_port_state_free(ips); |
275 | continue; |
276 | } |
277 | |
278 | if ((events[i].events & EPOLLOUT) != 0 && thr_info_port_writable(ips) < 0) { |
279 | cf_poll_delete_socket(poll, &ips->sock); |
280 | info_port_state_free(ips); |
281 | continue; |
282 | } |
283 | } |
284 | } |
285 | } |
286 | |
287 | return NULL; |
288 | } |
289 | |
290 | |
291 | void |
292 | as_info_port_start() |
293 | { |
294 | if (g_info_port == 0) { |
295 | return; |
296 | } |
297 | |
298 | cf_info(AS_INFO_PORT, "starting info port thread" ); |
299 | |
300 | cf_thread_create_detached(run_info_port, NULL); |
301 | |
302 | // For orderly startup log, wait for endpoint setup. |
303 | while (! g_started) { |
304 | usleep(1000); |
305 | } |
306 | } |
307 | |