1 | /* |
2 | * service.c |
3 | * |
4 | * Copyright (C) 2018 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "base/service.h" |
28 | |
29 | #include <errno.h> |
30 | #include <sched.h> |
31 | #include <stdbool.h> |
32 | #include <stddef.h> |
33 | #include <stdint.h> |
34 | #include <sys/epoll.h> |
35 | #include <sys/resource.h> |
36 | #include <sys/time.h> |
37 | #include <unistd.h> |
38 | #include <zlib.h> |
39 | |
40 | #include "citrusleaf/alloc.h" |
41 | #include "citrusleaf/cf_atomic.h" |
42 | #include "citrusleaf/cf_clock.h" |
43 | #include "citrusleaf/cf_queue.h" |
44 | |
45 | #include "cf_mutex.h" |
46 | #include "cf_thread.h" |
47 | #include "epoll_queue.h" |
48 | #include "fault.h" |
49 | #include "hardware.h" |
50 | #include "hist.h" |
51 | #include "socket.h" |
52 | #include "tls.h" |
53 | |
54 | #include "base/batch.h" |
55 | #include "base/cfg.h" |
56 | #include "base/datamodel.h" |
57 | #include "base/proto.h" |
58 | #include "base/security.h" |
59 | #include "base/stats.h" |
60 | #include "base/thr_info.h" |
61 | #include "base/thr_tsvc.h" |
62 | #include "base/transaction.h" |
63 | #include "base/xdr_serverside.h" |
64 | |
65 | #include "warnings.h" |
66 | |
67 | |
68 | //========================================================== |
69 | // Typedefs & constants. |
70 | // |
71 | |
72 | #define N_EVENTS 1024 |
73 | |
74 | #define XDR_WRITE_BUFFER_SIZE (5 * 1024 * 1024) |
75 | #define XDR_READ_BUFFER_SIZE (15 * 1024 * 1024) |
76 | |
77 | typedef struct thread_ctx_s { |
78 | cf_topo_cpu_index i_cpu; |
79 | cf_mutex* lock; |
80 | cf_poll poll; |
81 | cf_epoll_queue trans_q; |
82 | } thread_ctx; |
83 | |
84 | |
85 | //========================================================== |
86 | // Globals. |
87 | // |
88 | |
89 | as_service_access g_access = { |
90 | .service = { .addrs = { .n_addrs = 0 }, .port = 0 }, |
91 | .alt_service = { .addrs = { .n_addrs = 0 }, .port = 0 }, |
92 | .tls_service = { .addrs = { .n_addrs = 0 }, .port = 0 }, |
93 | .alt_tls_service = { .addrs = { .n_addrs = 0 }, .port = 0 } |
94 | }; |
95 | |
96 | cf_serv_cfg g_service_bind = { .n_cfgs = 0 }; |
97 | cf_tls_info* g_service_tls; |
98 | |
99 | static cf_sockets g_sockets; |
100 | |
101 | static cf_mutex g_thread_locks[MAX_SERVICE_THREADS]; |
102 | static thread_ctx* g_thread_ctxs[MAX_SERVICE_THREADS]; |
103 | |
104 | static cf_mutex g_reaper_lock = CF_MUTEX_INIT; |
105 | static uint32_t g_n_slots; |
106 | static as_file_handle** g_file_handles; |
107 | static cf_queue g_free_slots; |
108 | |
109 | |
110 | //========================================================== |
111 | // Forward declarations. |
112 | // |
113 | |
114 | // Setup. |
115 | static void create_service_thread(uint32_t sid); |
116 | static void add_localhost(cf_serv_cfg* serv_cfg, cf_sock_owner owner); |
117 | |
118 | // Accept client connections. |
119 | static void* run_accept(void* udata); |
120 | |
121 | // Assign client connections to threads. |
122 | static void assign_socket(as_file_handle* fd_h, uint32_t events); |
123 | static uint32_t select_sid(void); |
124 | static uint32_t select_sid_pinned(cf_topo_cpu_index i_cpu); |
125 | static uint32_t select_sid_adq(cf_topo_napi_id id); |
126 | static void schedule_redistribution(void); |
127 | |
128 | // Demarshal client requests. |
129 | static void* run_service(void* udata); |
130 | static void stop_service(thread_ctx* ctx); |
131 | static void service_release_file_handle(as_file_handle* fd_h); |
132 | static bool process_readable(as_file_handle* fd_h); |
133 | static void start_transaction(as_file_handle* fd_h); |
134 | static bool decompress_msg(as_comp_proto* cproto, uint8_t** out_buf, uint64_t* out_buf_sz); |
135 | static void config_xdr_socket(cf_socket* sock); |
136 | |
137 | // Reap idle and bad connections. |
138 | static void start_reaper(void); |
139 | static void* run_reaper(void* udata); |
140 | |
141 | // Transaction queue. |
142 | static bool start_internal_transaction(thread_ctx* ctx); |
143 | |
144 | |
145 | //========================================================== |
146 | // Public API. |
147 | // |
148 | |
149 | void |
150 | as_service_init(void) |
151 | { |
152 | // Create epoll instances and service threads. |
153 | |
154 | cf_info(AS_SERVICE, "starting %u service threads" , |
155 | g_config.n_service_threads); |
156 | |
157 | for (uint32_t i = 0; i < MAX_SERVICE_THREADS; i++) { |
158 | cf_mutex_init(&g_thread_locks[i]); |
159 | } |
160 | |
161 | for (uint32_t i = 0; i < g_config.n_service_threads; i++) { |
162 | create_service_thread(i); |
163 | } |
164 | } |
165 | |
166 | void |
167 | as_service_start(void) |
168 | { |
169 | start_reaper(); |
170 | |
171 | // Create listening sockets. |
172 | |
173 | add_localhost(&g_service_bind, CF_SOCK_OWNER_SERVICE); |
174 | add_localhost(&g_service_bind, CF_SOCK_OWNER_SERVICE_TLS); |
175 | |
176 | as_xdr_info_port(&g_service_bind); |
177 | |
178 | if (cf_socket_init_server(&g_service_bind, &g_sockets) < 0) { |
179 | cf_crash(AS_SERVICE, "couldn't initialize service socket" ); |
180 | } |
181 | |
182 | cf_socket_show_server(AS_SERVICE, "client" , &g_sockets); |
183 | |
184 | // Create accept thread. |
185 | |
186 | cf_info(AS_SERVICE, "starting accept thread" ); |
187 | |
188 | cf_thread_create_detached(run_accept, NULL); |
189 | } |
190 | |
191 | void |
192 | as_service_set_threads(uint32_t n_threads) |
193 | { |
194 | uint32_t old_n_threads = g_config.n_service_threads; |
195 | |
196 | if (n_threads > old_n_threads) { |
197 | for (uint32_t sid = old_n_threads; sid < n_threads; sid++) { |
198 | create_service_thread(sid); |
199 | } |
200 | |
201 | g_config.n_service_threads = n_threads; |
202 | |
203 | schedule_redistribution(); |
204 | } |
205 | else if (n_threads < old_n_threads) { |
206 | g_config.n_service_threads = n_threads; |
207 | |
208 | for (uint32_t sid = n_threads; sid < old_n_threads; sid++) { |
209 | cf_mutex_lock(&g_thread_locks[sid]); |
210 | |
211 | thread_ctx* ctx = g_thread_ctxs[sid]; |
212 | |
213 | cf_detail(AS_SERVICE, "sending terminator sid %u ctx %p" , sid, ctx); |
214 | |
215 | as_transaction tr = { .msgp = NULL }; |
216 | |
217 | cf_epoll_queue_push(&ctx->trans_q, &tr); |
218 | g_thread_ctxs[sid] = NULL; |
219 | |
220 | cf_mutex_unlock(&g_thread_locks[sid]); |
221 | } |
222 | } |
223 | } |
224 | |
225 | void |
226 | as_service_rearm(as_file_handle* fd_h) |
227 | { |
228 | cf_poll_modify_socket(fd_h->poll, &fd_h->sock, |
229 | EPOLLIN | EPOLLONESHOT | EPOLLRDHUP, fd_h); |
230 | } |
231 | |
232 | void |
233 | as_service_enqueue_internal(as_transaction* tr) |
234 | { |
235 | while (true) { |
236 | uint32_t sid = as_config_is_cpu_pinned() ? |
237 | select_sid_pinned(cf_topo_current_cpu()) : select_sid(); |
238 | |
239 | cf_mutex_lock(&g_thread_locks[sid]); |
240 | |
241 | thread_ctx* ctx = g_thread_ctxs[sid]; |
242 | |
243 | if (ctx != NULL) { |
244 | cf_epoll_queue_push(&ctx->trans_q, tr); |
245 | cf_mutex_unlock(&g_thread_locks[sid]); |
246 | break; |
247 | } |
248 | |
249 | cf_mutex_unlock(&g_thread_locks[sid]); |
250 | } |
251 | } |
252 | |
253 | |
254 | //========================================================== |
255 | // Local helpers - setup. |
256 | // |
257 | |
258 | void |
259 | create_service_thread(uint32_t sid) |
260 | { |
261 | thread_ctx* ctx = cf_malloc(sizeof(thread_ctx)); |
262 | |
263 | cf_detail(AS_SERVICE, "starting sid %u ctx %p" , sid, ctx); |
264 | |
265 | if (as_config_is_cpu_pinned()) { |
266 | ctx->i_cpu = (cf_topo_cpu_index)(sid % cf_topo_count_cpus()); |
267 | } |
268 | |
269 | ctx->lock = &g_thread_locks[sid]; |
270 | cf_poll_create(&ctx->poll); |
271 | cf_epoll_queue_init(&ctx->trans_q, AS_TRANSACTION_HEAD_SIZE, 64); |
272 | |
273 | cf_thread_create_detached(run_service, ctx); |
274 | |
275 | cf_mutex_lock(&g_thread_locks[sid]); |
276 | |
277 | g_thread_ctxs[sid] = ctx; |
278 | |
279 | cf_mutex_unlock(&g_thread_locks[sid]); |
280 | } |
281 | |
282 | static void |
283 | add_localhost(cf_serv_cfg* serv_cfg, cf_sock_owner owner) |
284 | { |
285 | // Localhost will only be added to the addresses, if we're not yet listening |
286 | // on wildcard ("any") or localhost. |
287 | |
288 | cf_ip_port port = 0; |
289 | |
290 | for (uint32_t i = 0; i < serv_cfg->n_cfgs; i++) { |
291 | if (serv_cfg->cfgs[i].owner != owner) { |
292 | continue; |
293 | } |
294 | |
295 | port = serv_cfg->cfgs[i].port; |
296 | |
297 | if (cf_ip_addr_is_any(&serv_cfg->cfgs[i].addr) || |
298 | cf_ip_addr_is_local(&serv_cfg->cfgs[i].addr)) { |
299 | return; |
300 | } |
301 | } |
302 | |
303 | if (port == 0) { |
304 | return; |
305 | } |
306 | |
307 | cf_sock_cfg sock_cfg; |
308 | |
309 | cf_sock_cfg_init(&sock_cfg, owner); |
310 | sock_cfg.port = port; |
311 | cf_ip_addr_set_local(&sock_cfg.addr); |
312 | |
313 | if (cf_serv_cfg_add_sock_cfg(serv_cfg, &sock_cfg) < 0) { |
314 | cf_crash(AS_SERVICE, "couldn't add localhost listening address" ); |
315 | } |
316 | } |
317 | |
318 | |
319 | //========================================================== |
320 | // Local helpers - accept client connections. |
321 | // |
322 | |
323 | static void* |
324 | run_accept(void* udata) |
325 | { |
326 | (void)udata; |
327 | |
328 | cf_poll poll; |
329 | cf_poll_create(&poll); |
330 | |
331 | cf_poll_add_sockets(poll, &g_sockets, EPOLLIN); |
332 | |
333 | while (true) { |
334 | cf_poll_event events[N_EVENTS]; |
335 | int32_t n_events = cf_poll_wait(poll, events, N_EVENTS, -1); |
336 | |
337 | cf_assert(n_events >= 0, AS_SERVICE, "unexpected EINTR" ); |
338 | |
339 | for (uint32_t i = 0; i < (uint32_t)n_events; i++) { |
340 | cf_socket* ssock = events[i].data; |
341 | cf_socket csock; |
342 | cf_sock_addr caddr; |
343 | |
344 | if (cf_socket_accept(ssock, &csock, &caddr) < 0) { |
345 | if (errno == EMFILE || errno == ENFILE) { |
346 | cf_ticker_warning(AS_SERVICE, "out of file descriptors" ); |
347 | continue; |
348 | } |
349 | |
350 | cf_crash(AS_SERVICE, "accept() failed: %d (%s)" , errno, |
351 | cf_strerror(errno)); |
352 | } |
353 | |
354 | cf_sock_cfg* cfg = ssock->cfg; |
355 | |
356 | // Ensure that proto_connections_closed is read first. |
357 | uint64_t n_closed = g_stats.proto_connections_closed; |
358 | uint64_t n_opened = g_stats.proto_connections_opened; |
359 | uint64_t n_open = n_opened - n_closed; |
360 | |
361 | // TODO - XDR exemption to become a special feature. |
362 | if (n_open >= g_config.n_proto_fd_max && |
363 | cfg->owner != CF_SOCK_OWNER_XDR) { |
364 | cf_ticker_warning(AS_SERVICE, |
365 | "refusing client connection - proto-fd-max %u" , |
366 | g_config.n_proto_fd_max); |
367 | |
368 | cf_socket_close(&csock); |
369 | cf_socket_term(&csock); |
370 | continue; |
371 | } |
372 | |
373 | if (cfg->owner == CF_SOCK_OWNER_SERVICE_TLS) { |
374 | tls_socket_prepare_server(g_service_tls, &csock); |
375 | } |
376 | |
377 | as_file_handle* fd_h = cf_rc_alloc(sizeof(as_file_handle)); |
378 | // Ref for epoll instance. |
379 | |
380 | cf_sock_addr_to_string_safe(&caddr, fd_h->client, |
381 | sizeof(fd_h->client)); |
382 | cf_socket_copy(&csock, &fd_h->sock); |
383 | |
384 | fd_h->last_used = cf_getns(); |
385 | fd_h->in_transaction = 0; |
386 | fd_h->move_me = false; |
387 | fd_h->reap_me = false; |
388 | fd_h->is_xdr = false; |
389 | fd_h->proto = NULL; |
390 | fd_h->proto_unread = sizeof(as_proto); |
391 | fd_h->security_filter = as_security_filter_create(); |
392 | |
393 | cf_rc_reserve(fd_h); // ref for reaper |
394 | |
395 | cf_mutex_lock(&g_reaper_lock); |
396 | |
397 | uint32_t slot; |
398 | |
399 | if (cf_queue_pop(&g_free_slots, &slot, CF_QUEUE_NOWAIT) != |
400 | CF_QUEUE_OK) { |
401 | cf_crash(AS_SERVICE, "cannot get free slot" ); |
402 | } |
403 | |
404 | g_file_handles[slot] = fd_h; |
405 | |
406 | cf_mutex_unlock(&g_reaper_lock); |
407 | |
408 | assign_socket(fd_h, EPOLLIN); // needs to be armed (EPOLLIN) |
409 | |
410 | cf_atomic64_incr(&g_stats.proto_connections_opened); |
411 | } |
412 | } |
413 | |
414 | return NULL; |
415 | } |
416 | |
417 | |
418 | //========================================================== |
419 | // Local helpers - assign client connections to threads. |
420 | // |
421 | |
422 | static void |
423 | assign_socket(as_file_handle* fd_h, uint32_t events) |
424 | { |
425 | while (true) { |
426 | uint32_t sid; |
427 | |
428 | switch (g_config.auto_pin) { |
429 | case CF_TOPO_AUTO_PIN_NONE: |
430 | sid = select_sid(); |
431 | break; |
432 | case CF_TOPO_AUTO_PIN_CPU: |
433 | case CF_TOPO_AUTO_PIN_NUMA: |
434 | sid = select_sid_pinned(cf_topo_socket_cpu(&fd_h->sock)); |
435 | break; |
436 | case CF_TOPO_AUTO_PIN_ADQ: |
437 | sid = select_sid_adq(cf_topo_socket_napi_id(&fd_h->sock)); |
438 | break; |
439 | default: |
440 | cf_crash(AS_SERVICE, "bad auto-pin %d" , g_config.auto_pin); |
441 | return; |
442 | } |
443 | |
444 | cf_mutex_lock(&g_thread_locks[sid]); |
445 | |
446 | thread_ctx* ctx = g_thread_ctxs[sid]; |
447 | |
448 | if (ctx != NULL) { |
449 | fd_h->poll = ctx->poll; |
450 | |
451 | cf_poll_add_socket(fd_h->poll, &fd_h->sock, |
452 | events | EPOLLONESHOT | EPOLLRDHUP, fd_h); |
453 | |
454 | cf_mutex_unlock(&g_thread_locks[sid]); |
455 | break; |
456 | } |
457 | |
458 | cf_mutex_unlock(&g_thread_locks[sid]); |
459 | } |
460 | } |
461 | |
462 | static uint32_t |
463 | select_sid(void) |
464 | { |
465 | static uint32_t rr = 0; |
466 | |
467 | return rr++ % g_config.n_service_threads; |
468 | } |
469 | |
470 | static uint32_t |
471 | select_sid_pinned(cf_topo_cpu_index i_cpu) |
472 | { |
473 | static uint32_t rr[CPU_SETSIZE] = { 0 }; |
474 | |
475 | uint16_t n_cpus = cf_topo_count_cpus(); |
476 | uint32_t threads_per_cpu = g_config.n_service_threads / n_cpus; |
477 | |
478 | uint32_t thread_ix = rr[i_cpu]++ % threads_per_cpu; |
479 | |
480 | return (thread_ix * n_cpus) + i_cpu; |
481 | } |
482 | |
483 | static uint32_t |
484 | select_sid_adq(cf_topo_napi_id id) |
485 | { |
486 | return id == 0 ? select_sid() : id % g_config.n_service_threads; |
487 | } |
488 | |
489 | static void |
490 | schedule_redistribution(void) |
491 | { |
492 | cf_mutex_lock(&g_reaper_lock); |
493 | |
494 | uint32_t n_remaining = g_n_slots - (uint32_t)cf_queue_sz(&g_free_slots); |
495 | |
496 | for (uint32_t i = 0; n_remaining != 0; i++) { |
497 | as_file_handle* fd_h = g_file_handles[i]; |
498 | |
499 | if (fd_h != NULL) { |
500 | fd_h->move_me = true; |
501 | n_remaining--; |
502 | } |
503 | } |
504 | |
505 | cf_mutex_unlock(&g_reaper_lock); |
506 | } |
507 | |
508 | |
509 | //========================================================== |
510 | // Local helpers - demarshal client requests. |
511 | // |
512 | |
513 | static void* |
514 | run_service(void* udata) |
515 | { |
516 | thread_ctx* ctx = (thread_ctx*)udata; |
517 | |
518 | cf_detail(AS_SERVICE, "running ctx %p" , ctx); |
519 | |
520 | if (as_config_is_cpu_pinned()) { |
521 | cf_topo_pin_to_cpu(ctx->i_cpu); |
522 | } |
523 | |
524 | cf_poll poll = ctx->poll; |
525 | cf_epoll_queue* trans_q = &ctx->trans_q; |
526 | |
527 | cf_poll_add_fd(poll, trans_q->event_fd, EPOLLIN, trans_q); |
528 | |
529 | while (true) { |
530 | cf_poll_event events[N_EVENTS]; |
531 | int32_t n_events = cf_poll_wait(poll, events, N_EVENTS, -1); |
532 | |
533 | cf_assert(n_events >= 0, AS_SERVICE, "unexpected EINTR" ); |
534 | |
535 | for (uint32_t i = 0; i < (uint32_t)n_events; i++) { |
536 | if (events[i].data == trans_q) { |
537 | cf_assert(events[i].events == EPOLLIN, AS_SERVICE, |
538 | "unexpected event: 0x%0x" , events[i].events); |
539 | |
540 | if (start_internal_transaction(ctx)) { |
541 | continue; |
542 | } |
543 | |
544 | stop_service(ctx); |
545 | |
546 | return NULL; |
547 | } |
548 | |
549 | as_file_handle* fd_h = events[i].data; |
550 | |
551 | if ((events[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) != 0) { |
552 | service_release_file_handle(fd_h); |
553 | continue; |
554 | } |
555 | |
556 | if (tls_socket_needs_handshake(&fd_h->sock)) { |
557 | int32_t tls_ev = tls_socket_accept(&fd_h->sock); |
558 | |
559 | if (tls_ev == EPOLLERR) { |
560 | service_release_file_handle(fd_h); |
561 | continue; |
562 | } |
563 | |
564 | if (tls_ev == 0) { |
565 | tls_socket_must_not_have_data(&fd_h->sock, |
566 | "service handshake" ); |
567 | tls_ev = EPOLLIN; |
568 | } |
569 | |
570 | cf_poll_modify_socket(fd_h->poll, &fd_h->sock, |
571 | (uint32_t)tls_ev | EPOLLONESHOT | EPOLLRDHUP, fd_h); |
572 | continue; |
573 | } |
574 | |
575 | if (fd_h->proto == NULL && fd_h->proto_unread == sizeof(as_proto)) { |
576 | fd_h->last_used = cf_getns(); // request start time - for now |
577 | } |
578 | |
579 | if (! process_readable(fd_h)) { |
580 | service_release_file_handle(fd_h); |
581 | continue; |
582 | } |
583 | |
584 | tls_socket_must_not_have_data(&fd_h->sock, "full client read" ); |
585 | |
586 | if (fd_h->proto_unread != 0) { |
587 | as_service_rearm(fd_h); |
588 | continue; |
589 | } |
590 | |
591 | if (fd_h->move_me) { |
592 | cf_poll_delete_socket(fd_h->poll, &fd_h->sock); |
593 | assign_socket(fd_h, 0); // known to be unarmed (no EPOLLIN) |
594 | |
595 | fd_h->move_me = false; |
596 | } |
597 | |
598 | // Note that epoll cannot trigger again for this file handle during |
599 | // the transaction. We'll rearm at the end of the transaction. |
600 | start_transaction(fd_h); |
601 | } |
602 | } |
603 | |
604 | return NULL; |
605 | } |
606 | |
607 | static void |
608 | stop_service(thread_ctx* ctx) |
609 | { |
610 | cf_detail(AS_SERVICE, "stopping ctx %p" , ctx); |
611 | |
612 | while (true) { |
613 | bool any_in_transaction = false; |
614 | |
615 | cf_mutex_lock(&g_reaper_lock); |
616 | |
617 | uint32_t n_remaining = g_n_slots - (uint32_t)cf_queue_sz(&g_free_slots); |
618 | |
619 | for (uint32_t i = 0; n_remaining != 0; i++) { |
620 | as_file_handle* fd_h = g_file_handles[i]; |
621 | |
622 | if (fd_h == NULL) { |
623 | continue; |
624 | } |
625 | |
626 | n_remaining--; |
627 | |
628 | // Ignore, if another thread's or INVALID_POLL. |
629 | if (! cf_poll_equal(fd_h->poll, ctx->poll)) { |
630 | continue; |
631 | } |
632 | |
633 | // Don't transfer during TLS handshake - might need EPOLLOUT. |
634 | if (tls_socket_needs_handshake(&fd_h->sock)) { |
635 | service_release_file_handle(fd_h); |
636 | continue; |
637 | } |
638 | |
639 | if (fd_h->in_transaction != 0) { |
640 | any_in_transaction = true; |
641 | continue; |
642 | } |
643 | |
644 | cf_poll_delete_socket(fd_h->poll, &fd_h->sock); |
645 | assign_socket(fd_h, EPOLLIN); // known to be armed (EPOLLIN) |
646 | } |
647 | |
648 | cf_mutex_unlock(&g_reaper_lock); |
649 | |
650 | if (! any_in_transaction) { |
651 | break; |
652 | } |
653 | |
654 | sleep(1); |
655 | } |
656 | |
657 | cf_poll_destroy(ctx->poll); |
658 | cf_epoll_queue_destroy(&ctx->trans_q); |
659 | |
660 | cf_free(ctx); |
661 | |
662 | cf_detail(AS_SERVICE, "stopped ctx %p" , ctx); |
663 | } |
664 | |
665 | static void |
666 | service_release_file_handle(as_file_handle* fd_h) |
667 | { |
668 | cf_poll_delete_socket(fd_h->poll, &fd_h->sock); |
669 | fd_h->poll = INVALID_POLL; |
670 | fd_h->reap_me = true; |
671 | as_release_file_handle(fd_h); |
672 | } |
673 | |
674 | static bool |
675 | process_readable(as_file_handle* fd_h) |
676 | { |
677 | uint8_t* end = fd_h->proto == NULL ? |
678 | (uint8_t*)&fd_h->proto_hdr + sizeof(as_proto) : // header |
679 | fd_h->proto->body + fd_h->proto->sz; // body |
680 | |
681 | while (true) { |
682 | int32_t sz = cf_socket_recv(&fd_h->sock, end - fd_h->proto_unread, |
683 | fd_h->proto_unread, 0); |
684 | |
685 | if (sz < 0) { |
686 | return errno == EAGAIN || errno == EWOULDBLOCK; |
687 | } |
688 | |
689 | if (sz == 0) { |
690 | return false; |
691 | } |
692 | |
693 | fd_h->proto_unread -= (uint64_t)sz; |
694 | |
695 | if (fd_h->proto_unread != 0) { |
696 | continue; // drain socket (and OpenSSL's internal buffer) dry |
697 | } |
698 | |
699 | if (fd_h->proto != NULL) { |
700 | return true; // done with entire request |
701 | } |
702 | // else - switch from header to body. |
703 | |
704 | // Check for a TLS ClientHello arriving at a non-TLS socket. Heuristic: |
705 | // - tls[0] == ContentType.handshake (22) |
706 | // - tls[1] == ProtocolVersion.major (3) |
707 | // - tls[5] == HandshakeType.client_hello (1) |
708 | |
709 | uint8_t* tls = (uint8_t*)&fd_h->proto_hdr; |
710 | |
711 | if (tls[0] == 22 && tls[1] == 3 && tls[5] == 1) { |
712 | cf_warning(AS_SERVICE, "ignoring TLS connection from %s" , |
713 | fd_h->client); |
714 | return false; |
715 | } |
716 | |
717 | // For backward compatibility, allow version 0 with security messages. |
718 | if (fd_h->proto_hdr.version != PROTO_VERSION && |
719 | ! (fd_h->proto_hdr.version == 0 && |
720 | fd_h->proto_hdr.type == PROTO_TYPE_SECURITY)) { |
721 | cf_warning(AS_SERVICE, "unsupported proto version %d from %s" , |
722 | fd_h->proto_hdr.version, fd_h->client); |
723 | return false; |
724 | } |
725 | |
726 | if (! as_proto_is_valid_type(&fd_h->proto_hdr)) { |
727 | cf_warning(AS_SERVICE, "unsupported proto type %d from %s" , |
728 | fd_h->proto_hdr.type, fd_h->client); |
729 | return false; |
730 | } |
731 | |
732 | as_proto_swap(&fd_h->proto_hdr); |
733 | |
734 | if (fd_h->proto_hdr.sz > PROTO_SIZE_MAX) { |
735 | cf_warning(AS_SERVICE, "invalid proto size %lu from %s" , |
736 | (uint64_t)fd_h->proto_hdr.sz, fd_h->client); |
737 | return false; |
738 | } |
739 | |
740 | fd_h->proto = cf_malloc(sizeof(as_proto) + fd_h->proto_hdr.sz); |
741 | memcpy(fd_h->proto, &fd_h->proto_hdr, sizeof(as_proto)); |
742 | |
743 | fd_h->proto_unread = fd_h->proto->sz; |
744 | end = fd_h->proto->body + fd_h->proto->sz; |
745 | } |
746 | } |
747 | |
748 | static void |
749 | start_transaction(as_file_handle* fd_h) |
750 | { |
751 | // as_end_of_transaction() rearms then decrements, so this may be > 1. |
752 | as_incr_uint32(&fd_h->in_transaction); |
753 | |
754 | uint64_t start_ns = fd_h->last_used; |
755 | as_proto* proto = fd_h->proto; |
756 | |
757 | fd_h->proto = NULL; |
758 | fd_h->proto_unread = sizeof(as_proto); |
759 | |
760 | if (proto->type == PROTO_TYPE_INFO) { |
761 | as_info_transaction it = { |
762 | .fd_h = fd_h, |
763 | .proto = proto, |
764 | .start_time = start_ns |
765 | }; |
766 | |
767 | as_info(&it); |
768 | return; |
769 | } |
770 | |
771 | as_transaction tr; |
772 | as_transaction_init_head(&tr, NULL, (cl_msg*)proto); |
773 | |
774 | tr.origin = FROM_CLIENT; |
775 | tr.from.proto_fd_h = fd_h; |
776 | tr.start_time = start_ns; |
777 | |
778 | if (proto->type == PROTO_TYPE_SECURITY) { |
779 | as_security_transact(&tr); |
780 | return; |
781 | } |
782 | |
783 | if (proto->type == PROTO_TYPE_AS_MSG_COMPRESSED) { |
784 | uint8_t* buf = NULL; |
785 | uint64_t buf_sz = 0; |
786 | |
787 | if (! decompress_msg((as_comp_proto*)proto, &buf, &buf_sz)) { |
788 | as_transaction_demarshal_error(&tr, AS_ERR_UNKNOWN); |
789 | return; |
790 | } |
791 | |
792 | cf_free(proto); |
793 | |
794 | proto = (as_proto*)buf; |
795 | tr.msgp = (cl_msg*)proto; |
796 | |
797 | as_proto_swap(proto); |
798 | |
799 | if (! as_proto_wrapped_is_valid(proto, buf_sz)) { |
800 | cf_warning(AS_SERVICE, "decompressed proto: (%d,%d,%lu,%lu)" , |
801 | proto->version, proto->type, (uint64_t)proto->sz, buf_sz); |
802 | as_transaction_demarshal_error(&tr, AS_ERR_UNKNOWN); |
803 | return; |
804 | } |
805 | } |
806 | |
807 | if (as_transaction_is_xdr(&tr) && ! fd_h->is_xdr) { |
808 | config_xdr_socket(&fd_h->sock); |
809 | fd_h->is_xdr = true; |
810 | } |
811 | |
812 | if (g_config.svc_benchmarks_enabled) { |
813 | tr.benchmark_time = histogram_insert_data_point( |
814 | g_stats.svc_demarshal_hist, start_ns); |
815 | } |
816 | |
817 | if (tr.msgp->msg.info1 & AS_MSG_INFO1_BATCH) { |
818 | as_batch_queue_task(&tr); |
819 | return; |
820 | } |
821 | |
822 | if (! as_transaction_prepare(&tr, true)) { |
823 | as_transaction_demarshal_error(&tr, AS_ERR_PARAMETER); |
824 | return; |
825 | } |
826 | |
827 | as_tsvc_process_transaction(&tr); |
828 | } |
829 | |
830 | static bool |
831 | decompress_msg(as_comp_proto* cproto, uint8_t** out_buf, uint64_t* out_buf_sz) |
832 | { |
833 | uint64_t orig_sz = cproto->orig_sz; |
834 | |
835 | // Hack to handle both little and big endian formats. Some clients wrongly |
836 | // send the size in little-endian format. If we interpret a legal big-endian |
837 | // size as little-endian, it will be > PROTO_SIZE_MAX. Use it as a clue. |
838 | if (orig_sz > PROTO_SIZE_MAX) { |
839 | orig_sz = cf_swap_from_be64(cproto->orig_sz); |
840 | |
841 | if (orig_sz > PROTO_SIZE_MAX) { |
842 | cf_warning(AS_SERVICE, "bad compressed packet size %lu" , orig_sz); |
843 | return false; |
844 | } |
845 | } |
846 | |
847 | uint8_t* decomp_buf = cf_malloc(orig_sz); |
848 | uint64_t decomp_buf_sz = orig_sz; |
849 | uint64_t comp_buf_sz = cproto->proto.sz - sizeof(cproto->orig_sz); |
850 | int rv = uncompress(decomp_buf, &decomp_buf_sz, cproto->data, comp_buf_sz); |
851 | |
852 | if (rv != Z_OK) { |
853 | cf_warning(AS_SERVICE, "zlib decompression failed with error %d" , rv); |
854 | cf_free(decomp_buf); |
855 | return false; |
856 | } |
857 | |
858 | if (orig_sz != decomp_buf_sz) { |
859 | cf_warning(AS_SERVICE, "decompressed size %lu is not expected size %lu" , |
860 | decomp_buf_sz, orig_sz); |
861 | cf_free(decomp_buf); |
862 | return false; |
863 | } |
864 | |
865 | *out_buf = decomp_buf; |
866 | *out_buf_sz = decomp_buf_sz; |
867 | |
868 | return true; |
869 | } |
870 | |
871 | static void |
872 | config_xdr_socket(cf_socket* sock) |
873 | { |
874 | cf_socket_set_receive_buffer(sock, XDR_READ_BUFFER_SIZE); |
875 | cf_socket_set_send_buffer(sock, XDR_WRITE_BUFFER_SIZE); |
876 | cf_socket_set_window(sock, XDR_READ_BUFFER_SIZE); |
877 | cf_socket_enable_nagle(sock); |
878 | } |
879 | |
880 | |
881 | //========================================================== |
882 | // Local helpers - reap idle and bad connections. |
883 | // |
884 | |
885 | static void |
886 | start_reaper(void) |
887 | { |
888 | struct rlimit rl; |
889 | |
890 | if (getrlimit(RLIMIT_NOFILE, &rl) < 0) { |
891 | cf_crash(AS_SERVICE, "getrlimit() failed: %s" , cf_strerror(errno)); |
892 | } |
893 | |
894 | g_n_slots = (uint32_t)rl.rlim_cur; |
895 | g_file_handles = cf_calloc(g_n_slots, sizeof(as_file_handle*)); |
896 | |
897 | cf_queue_init(&g_free_slots, sizeof(uint32_t), g_n_slots, false); |
898 | |
899 | for (uint32_t i = 0; i < g_n_slots; i++) { |
900 | cf_queue_push(&g_free_slots, &i); |
901 | } |
902 | |
903 | cf_info(AS_SERVICE, "starting reaper thread" ); |
904 | |
905 | cf_thread_create_detached(run_reaper, NULL); |
906 | } |
907 | |
908 | static void* |
909 | run_reaper(void* udata) |
910 | { |
911 | (void)udata; |
912 | |
913 | while (true) { |
914 | sleep(1); |
915 | |
916 | bool security_refresh = as_security_should_refresh(); |
917 | |
918 | uint64_t kill_ns = (uint64_t)g_config.proto_fd_idle_ms * 1000000; |
919 | uint64_t now_ns = cf_getns(); |
920 | |
921 | cf_mutex_lock(&g_reaper_lock); |
922 | |
923 | uint32_t n_remaining = g_n_slots - (uint32_t)cf_queue_sz(&g_free_slots); |
924 | |
925 | for (uint32_t i = 0; n_remaining != 0; i++) { |
926 | as_file_handle* fd_h = g_file_handles[i]; |
927 | |
928 | if (fd_h == NULL) { |
929 | continue; |
930 | } |
931 | |
932 | n_remaining--; |
933 | |
934 | if (security_refresh) { |
935 | as_security_refresh(fd_h); |
936 | } |
937 | |
938 | // reap_me overrides do_not_reap. |
939 | if (fd_h->reap_me) { |
940 | g_file_handles[i] = NULL; |
941 | cf_queue_push_head(&g_free_slots, &i); |
942 | as_release_file_handle(fd_h); |
943 | continue; |
944 | } |
945 | |
946 | if (fd_h->in_transaction != 0) { |
947 | continue; |
948 | } |
949 | |
950 | if (kill_ns != 0 && fd_h->last_used + kill_ns < now_ns) { |
951 | cf_socket_shutdown(&fd_h->sock); // will trigger epoll errors |
952 | |
953 | g_file_handles[i] = NULL; |
954 | cf_queue_push_head(&g_free_slots, &i); |
955 | as_release_file_handle(fd_h); |
956 | |
957 | g_stats.reaper_count++; |
958 | } |
959 | } |
960 | |
961 | cf_mutex_unlock(&g_reaper_lock); |
962 | } |
963 | |
964 | return NULL; |
965 | } |
966 | |
967 | |
968 | //========================================================== |
969 | // Local helpers - transaction queue. |
970 | // |
971 | |
972 | static bool |
973 | start_internal_transaction(thread_ctx* ctx) |
974 | { |
975 | as_transaction tr; |
976 | |
977 | cf_mutex_lock(ctx->lock); |
978 | |
979 | if (! cf_epoll_queue_pop(&ctx->trans_q, &tr)) { |
980 | cf_crash(AS_SERVICE, "unable to pop from transaction queue" ); |
981 | } |
982 | |
983 | cf_mutex_unlock(ctx->lock); |
984 | |
985 | if (tr.msgp == NULL) { |
986 | return false; |
987 | } |
988 | |
989 | if (g_config.svc_benchmarks_enabled && |
990 | tr.benchmark_time != 0 && ! as_transaction_is_restart(&tr)) { |
991 | histogram_insert_data_point(g_stats.svc_queue_hist, tr.benchmark_time); |
992 | } |
993 | |
994 | as_tsvc_process_transaction(&tr); |
995 | |
996 | return true; |
997 | } |
998 | |