1/*
2 * thr_info_port.c
3 *
4 * Copyright (C) 2008-2014 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#include "base/thr_info_port.h"
24
25#include <errno.h>
26#include <stdbool.h>
27#include <stddef.h>
28#include <stdint.h>
29#include <string.h>
30#include <unistd.h>
31#include <sys/ioctl.h>
32
33#include "citrusleaf/alloc.h"
34#include "citrusleaf/cf_atomic.h"
35
36#include "cf_str.h"
37#include "cf_thread.h"
38#include "dynbuf.h"
39#include "fault.h"
40#include "socket.h"
41
42#include "base/cfg.h"
43#include "base/thr_info.h"
44
45#define POLL_SZ 1024
46
47// State for any open info port.
48typedef struct {
49 int recv_pos;
50 int recv_alloc;
51 uint8_t *recv_buf;
52
53 int xmit_pos; // where we're currently writing
54 int xmit_limit; // the end of the write buffer
55 int xmit_alloc;
56 uint8_t *xmit_buf;
57
58 cf_socket sock;
59
60} info_port_state;
61
62cf_serv_cfg g_info_bind = { .n_cfgs = 0 };
63cf_ip_port g_info_port = 0;
64
65static cf_sockets g_sockets;
66
67// Using int for 4-byte size, but maintaining bool semantics.
68static volatile int g_started = false;
69
70void
71info_port_state_free(info_port_state *ips)
72{
73 if (ips->recv_buf) cf_free(ips->recv_buf);
74 if (ips->xmit_buf) cf_free(ips->xmit_buf);
75 cf_socket_close(&ips->sock);
76 cf_socket_term(&ips->sock);
77 memset(ips, -1, sizeof(info_port_state));
78 cf_free(ips);
79}
80
81
82int
83thr_info_port_readable(info_port_state *ips)
84{
85 int sz = cf_socket_available(&ips->sock);
86
87 if (sz == 0) {
88 return 0;
89 }
90
91 // Make sure we've got some reasonable space in the read buffer.
92 if (ips->recv_alloc - ips->recv_pos < sz) {
93 int new_sz = sz + ips->recv_pos + 100;
94 ips->recv_buf = cf_realloc(ips->recv_buf, new_sz);
95 ips->recv_alloc = new_sz;
96 }
97
98 int n = cf_socket_recv(&ips->sock, ips->recv_buf + ips->recv_pos, ips->recv_alloc - ips->recv_pos, 0);
99 if (n < 0) {
100 if (errno != EAGAIN) {
101 cf_detail(AS_INFO_PORT, "info socket: read fail: error: rv %d sz was %d errno %d", n, ips->recv_alloc - ips->recv_pos, errno);
102 }
103 return -1;
104 }
105 ips->recv_pos += n;
106
107 // What about a control-c?
108 if (-1 != cf_str_strnchr(ips->recv_buf, ips->recv_pos, 0xFF)) {
109 cf_debug(AS_INFO_PORT, "recived a control c, aborting");
110 return -1;
111 }
112
113 // See if we've got a CR or LF in the buf yet.
114 int cr = cf_str_strnchr(ips->recv_buf, ips->recv_pos, '\r');
115 int lf = cf_str_strnchr(ips->recv_buf, ips->recv_pos, '\n');
116 if ((cr >= 0) || (lf >= 0)) {
117 size_t len;
118 // Take the closest of cr or lf.
119 if (-1 == lf) {
120 len = cr;
121 }
122 else if (-1 == cr) {
123 len = lf;
124 }
125 else {
126 len = lf < cr ? lf : cr;
127 }
128
129 // We have a message. Process it.
130 cf_dyn_buf_define(db);
131
132 ips->recv_buf[len] = '\n';
133 len++;
134
135 // Fill out the db buffer with the response (always returns 0).
136 as_info_buffer(ips->recv_buf, len, &db);
137 if (db.used_sz == 0) cf_dyn_buf_append_char(&db, '\n');
138
139 // See if it has a tab, get that location. It probably does.
140 int tab = cf_str_strnchr(db.buf, db.used_sz , '\t');
141 tab++;
142
143 while (len < ips->recv_pos &&
144 ((ips->recv_buf[len] == '\r') || (ips->recv_buf[len] == '\n'))) {
145
146 len ++ ;
147 }
148
149 // Move transmit buffer forward.
150 if (ips->recv_pos - len > 0) {
151 memmove(ips->recv_buf, ips->recv_buf + len, ips->recv_pos - len);
152 ips->recv_pos -= len;
153 }
154 else {
155 ips->recv_pos = 0;
156 }
157
158 // Queue the response - set to the xmit buf.
159 if (ips->xmit_alloc - ips->xmit_limit < db.used_sz) {
160 ips->xmit_buf = cf_realloc(ips->xmit_buf, db.used_sz + ips->xmit_limit);
161 ips->xmit_alloc = db.used_sz + ips->xmit_limit;
162 }
163 memcpy(ips->xmit_buf + ips->xmit_limit, db.buf + tab, db.used_sz - tab);
164 ips->xmit_limit += db.used_sz - tab;
165
166 cf_dyn_buf_free(&db);
167 }
168
169 return 0;
170}
171
172
173int
174thr_info_port_writable(info_port_state *ips)
175{
176 // Do we have bytes to write?
177 if (ips->xmit_limit > 0) {
178
179 // Write them!
180 int rv = cf_socket_send(&ips->sock, ips->xmit_buf + ips->xmit_pos, ips->xmit_limit - ips->xmit_pos , MSG_NOSIGNAL);
181 if (rv < 0) {
182 if (errno != EAGAIN) {
183 return -1;
184 }
185 }
186 else if (rv == 0) {
187 cf_debug(AS_INFO_PORT, "send with return value 0");
188 return 0;
189 }
190 else {
191 ips->xmit_pos += rv;
192 if (ips->xmit_pos == ips->xmit_limit) {
193 ips->xmit_pos = ips->xmit_limit = 0;
194 }
195 }
196 }
197
198 return 0;
199}
200
201
202// Demarshal info socket connections.
203void *
204run_info_port(void *arg)
205{
206 cf_poll poll;
207 cf_debug(AS_INFO_PORT, "Info port process started");
208
209 // Start the listener socket. Note that because this is done after privilege
210 // de-escalation, we can't use privileged ports.
211
212 if (cf_socket_init_server(&g_info_bind, &g_sockets) < 0) {
213 cf_crash(AS_INFO_PORT, "Couldn't initialize service sockets");
214 }
215
216 cf_poll_create(&poll);
217 cf_poll_add_sockets(poll, &g_sockets, EPOLLIN | EPOLLERR | EPOLLHUP);
218 cf_socket_show_server(AS_INFO_PORT, "info", &g_sockets);
219
220 g_started = true;
221
222 while (true) {
223 cf_poll_event events[POLL_SZ];
224 int32_t n_ev = cf_poll_wait(poll, events, POLL_SZ, -1);
225
226 for (int32_t i = 0; i < n_ev; ++i) {
227 cf_socket *ssock = events[i].data;
228
229 if (cf_sockets_has_socket(&g_sockets, ssock)) {
230 cf_socket csock;
231 cf_sock_addr addr;
232
233 if (cf_socket_accept(ssock, &csock, &addr) < 0) {
234 // This means we're out of file descriptors.
235 if (errno == EMFILE) {
236 cf_warning(AS_INFO_PORT, "Too many file descriptors in use, consider raising limit");
237 continue;
238 }
239
240 cf_crash(AS_INFO_PORT, "cf_socket_accept() failed");
241 }
242
243 cf_detail(AS_INFO_PORT, "New connection: %s", cf_sock_addr_print(&addr));
244 info_port_state *ips = cf_malloc(sizeof(info_port_state));
245
246 ips->recv_pos = 0;
247 ips->recv_alloc = 100;
248 ips->recv_buf = cf_malloc(100);
249 ips->xmit_limit = ips->xmit_pos = 0;
250 ips->xmit_alloc = 100;
251 ips->xmit_buf = cf_malloc(100);
252 cf_socket_copy(&csock, &ips->sock);
253
254 cf_poll_add_socket(poll, &csock, EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP, ips);
255 }
256 else {
257 info_port_state *ips = events[i].data;
258
259 if (ips == NULL) {
260 cf_crash(AS_INFO_PORT, "Event with null handle");
261 }
262
263 cf_detail(AS_INFO_PORT, "Events %x on FD %d", events[i].events, CSFD(&ips->sock));
264
265 if (events[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) {
266 cf_detail(AS_INFO_PORT, "Remote close on FD %d", CSFD(&ips->sock));
267 cf_poll_delete_socket(poll, &ips->sock);
268 info_port_state_free(ips);
269 continue;
270 }
271
272 if ((events[i].events & EPOLLIN) != 0 && thr_info_port_readable(ips) < 0) {
273 cf_poll_delete_socket(poll, &ips->sock);
274 info_port_state_free(ips);
275 continue;
276 }
277
278 if ((events[i].events & EPOLLOUT) != 0 && thr_info_port_writable(ips) < 0) {
279 cf_poll_delete_socket(poll, &ips->sock);
280 info_port_state_free(ips);
281 continue;
282 }
283 }
284 }
285 }
286
287 return NULL;
288}
289
290
291void
292as_info_port_start()
293{
294 if (g_info_port == 0) {
295 return;
296 }
297
298 cf_info(AS_INFO_PORT, "starting info port thread");
299
300 cf_thread_create_detached(run_info_port, NULL);
301
302 // For orderly startup log, wait for endpoint setup.
303 while (! g_started) {
304 usleep(1000);
305 }
306}
307