1/*
2 * service.c
3 *
4 * Copyright (C) 2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "base/service.h"
28
29#include <errno.h>
30#include <sched.h>
31#include <stdbool.h>
32#include <stddef.h>
33#include <stdint.h>
34#include <sys/epoll.h>
35#include <sys/resource.h>
36#include <sys/time.h>
37#include <unistd.h>
38#include <zlib.h>
39
40#include "citrusleaf/alloc.h"
41#include "citrusleaf/cf_atomic.h"
42#include "citrusleaf/cf_clock.h"
43#include "citrusleaf/cf_queue.h"
44
45#include "cf_mutex.h"
46#include "cf_thread.h"
47#include "epoll_queue.h"
48#include "fault.h"
49#include "hardware.h"
50#include "hist.h"
51#include "socket.h"
52#include "tls.h"
53
54#include "base/batch.h"
55#include "base/cfg.h"
56#include "base/datamodel.h"
57#include "base/proto.h"
58#include "base/security.h"
59#include "base/stats.h"
60#include "base/thr_info.h"
61#include "base/thr_tsvc.h"
62#include "base/transaction.h"
63#include "base/xdr_serverside.h"
64
65#include "warnings.h"
66
67
68//==========================================================
69// Typedefs & constants.
70//
71
72#define N_EVENTS 1024
73
74#define XDR_WRITE_BUFFER_SIZE (5 * 1024 * 1024)
75#define XDR_READ_BUFFER_SIZE (15 * 1024 * 1024)
76
77typedef struct thread_ctx_s {
78 cf_topo_cpu_index i_cpu;
79 cf_mutex* lock;
80 cf_poll poll;
81 cf_epoll_queue trans_q;
82} thread_ctx;
83
84
85//==========================================================
86// Globals.
87//
88
89as_service_access g_access = {
90 .service = { .addrs = { .n_addrs = 0 }, .port = 0 },
91 .alt_service = { .addrs = { .n_addrs = 0 }, .port = 0 },
92 .tls_service = { .addrs = { .n_addrs = 0 }, .port = 0 },
93 .alt_tls_service = { .addrs = { .n_addrs = 0 }, .port = 0 }
94};
95
96cf_serv_cfg g_service_bind = { .n_cfgs = 0 };
97cf_tls_info* g_service_tls;
98
99static cf_sockets g_sockets;
100
101static cf_mutex g_thread_locks[MAX_SERVICE_THREADS];
102static thread_ctx* g_thread_ctxs[MAX_SERVICE_THREADS];
103
104static cf_mutex g_reaper_lock = CF_MUTEX_INIT;
105static uint32_t g_n_slots;
106static as_file_handle** g_file_handles;
107static cf_queue g_free_slots;
108
109
110//==========================================================
111// Forward declarations.
112//
113
114// Setup.
115static void create_service_thread(uint32_t sid);
116static void add_localhost(cf_serv_cfg* serv_cfg, cf_sock_owner owner);
117
118// Accept client connections.
119static void* run_accept(void* udata);
120
121// Assign client connections to threads.
122static void assign_socket(as_file_handle* fd_h, uint32_t events);
123static uint32_t select_sid(void);
124static uint32_t select_sid_pinned(cf_topo_cpu_index i_cpu);
125static uint32_t select_sid_adq(cf_topo_napi_id id);
126static void schedule_redistribution(void);
127
128// Demarshal client requests.
129static void* run_service(void* udata);
130static void stop_service(thread_ctx* ctx);
131static void service_release_file_handle(as_file_handle* fd_h);
132static bool process_readable(as_file_handle* fd_h);
133static void start_transaction(as_file_handle* fd_h);
134static bool decompress_msg(as_comp_proto* cproto, uint8_t** out_buf, uint64_t* out_buf_sz);
135static void config_xdr_socket(cf_socket* sock);
136
137// Reap idle and bad connections.
138static void start_reaper(void);
139static void* run_reaper(void* udata);
140
141// Transaction queue.
142static bool start_internal_transaction(thread_ctx* ctx);
143
144
145//==========================================================
146// Public API.
147//
148
149void
150as_service_init(void)
151{
152 // Create epoll instances and service threads.
153
154 cf_info(AS_SERVICE, "starting %u service threads",
155 g_config.n_service_threads);
156
157 for (uint32_t i = 0; i < MAX_SERVICE_THREADS; i++) {
158 cf_mutex_init(&g_thread_locks[i]);
159 }
160
161 for (uint32_t i = 0; i < g_config.n_service_threads; i++) {
162 create_service_thread(i);
163 }
164}
165
166void
167as_service_start(void)
168{
169 start_reaper();
170
171 // Create listening sockets.
172
173 add_localhost(&g_service_bind, CF_SOCK_OWNER_SERVICE);
174 add_localhost(&g_service_bind, CF_SOCK_OWNER_SERVICE_TLS);
175
176 as_xdr_info_port(&g_service_bind);
177
178 if (cf_socket_init_server(&g_service_bind, &g_sockets) < 0) {
179 cf_crash(AS_SERVICE, "couldn't initialize service socket");
180 }
181
182 cf_socket_show_server(AS_SERVICE, "client", &g_sockets);
183
184 // Create accept thread.
185
186 cf_info(AS_SERVICE, "starting accept thread");
187
188 cf_thread_create_detached(run_accept, NULL);
189}
190
191void
192as_service_set_threads(uint32_t n_threads)
193{
194 uint32_t old_n_threads = g_config.n_service_threads;
195
196 if (n_threads > old_n_threads) {
197 for (uint32_t sid = old_n_threads; sid < n_threads; sid++) {
198 create_service_thread(sid);
199 }
200
201 g_config.n_service_threads = n_threads;
202
203 schedule_redistribution();
204 }
205 else if (n_threads < old_n_threads) {
206 g_config.n_service_threads = n_threads;
207
208 for (uint32_t sid = n_threads; sid < old_n_threads; sid++) {
209 cf_mutex_lock(&g_thread_locks[sid]);
210
211 thread_ctx* ctx = g_thread_ctxs[sid];
212
213 cf_detail(AS_SERVICE, "sending terminator sid %u ctx %p", sid, ctx);
214
215 as_transaction tr = { .msgp = NULL };
216
217 cf_epoll_queue_push(&ctx->trans_q, &tr);
218 g_thread_ctxs[sid] = NULL;
219
220 cf_mutex_unlock(&g_thread_locks[sid]);
221 }
222 }
223}
224
225void
226as_service_rearm(as_file_handle* fd_h)
227{
228 cf_poll_modify_socket(fd_h->poll, &fd_h->sock,
229 EPOLLIN | EPOLLONESHOT | EPOLLRDHUP, fd_h);
230}
231
232void
233as_service_enqueue_internal(as_transaction* tr)
234{
235 while (true) {
236 uint32_t sid = as_config_is_cpu_pinned() ?
237 select_sid_pinned(cf_topo_current_cpu()) : select_sid();
238
239 cf_mutex_lock(&g_thread_locks[sid]);
240
241 thread_ctx* ctx = g_thread_ctxs[sid];
242
243 if (ctx != NULL) {
244 cf_epoll_queue_push(&ctx->trans_q, tr);
245 cf_mutex_unlock(&g_thread_locks[sid]);
246 break;
247 }
248
249 cf_mutex_unlock(&g_thread_locks[sid]);
250 }
251}
252
253
254//==========================================================
255// Local helpers - setup.
256//
257
258void
259create_service_thread(uint32_t sid)
260{
261 thread_ctx* ctx = cf_malloc(sizeof(thread_ctx));
262
263 cf_detail(AS_SERVICE, "starting sid %u ctx %p", sid, ctx);
264
265 if (as_config_is_cpu_pinned()) {
266 ctx->i_cpu = (cf_topo_cpu_index)(sid % cf_topo_count_cpus());
267 }
268
269 ctx->lock = &g_thread_locks[sid];
270 cf_poll_create(&ctx->poll);
271 cf_epoll_queue_init(&ctx->trans_q, AS_TRANSACTION_HEAD_SIZE, 64);
272
273 cf_thread_create_detached(run_service, ctx);
274
275 cf_mutex_lock(&g_thread_locks[sid]);
276
277 g_thread_ctxs[sid] = ctx;
278
279 cf_mutex_unlock(&g_thread_locks[sid]);
280}
281
282static void
283add_localhost(cf_serv_cfg* serv_cfg, cf_sock_owner owner)
284{
285 // Localhost will only be added to the addresses, if we're not yet listening
286 // on wildcard ("any") or localhost.
287
288 cf_ip_port port = 0;
289
290 for (uint32_t i = 0; i < serv_cfg->n_cfgs; i++) {
291 if (serv_cfg->cfgs[i].owner != owner) {
292 continue;
293 }
294
295 port = serv_cfg->cfgs[i].port;
296
297 if (cf_ip_addr_is_any(&serv_cfg->cfgs[i].addr) ||
298 cf_ip_addr_is_local(&serv_cfg->cfgs[i].addr)) {
299 return;
300 }
301 }
302
303 if (port == 0) {
304 return;
305 }
306
307 cf_sock_cfg sock_cfg;
308
309 cf_sock_cfg_init(&sock_cfg, owner);
310 sock_cfg.port = port;
311 cf_ip_addr_set_local(&sock_cfg.addr);
312
313 if (cf_serv_cfg_add_sock_cfg(serv_cfg, &sock_cfg) < 0) {
314 cf_crash(AS_SERVICE, "couldn't add localhost listening address");
315 }
316}
317
318
319//==========================================================
320// Local helpers - accept client connections.
321//
322
323static void*
324run_accept(void* udata)
325{
326 (void)udata;
327
328 cf_poll poll;
329 cf_poll_create(&poll);
330
331 cf_poll_add_sockets(poll, &g_sockets, EPOLLIN);
332
333 while (true) {
334 cf_poll_event events[N_EVENTS];
335 int32_t n_events = cf_poll_wait(poll, events, N_EVENTS, -1);
336
337 cf_assert(n_events >= 0, AS_SERVICE, "unexpected EINTR");
338
339 for (uint32_t i = 0; i < (uint32_t)n_events; i++) {
340 cf_socket* ssock = events[i].data;
341 cf_socket csock;
342 cf_sock_addr caddr;
343
344 if (cf_socket_accept(ssock, &csock, &caddr) < 0) {
345 if (errno == EMFILE || errno == ENFILE) {
346 cf_ticker_warning(AS_SERVICE, "out of file descriptors");
347 continue;
348 }
349
350 cf_crash(AS_SERVICE, "accept() failed: %d (%s)", errno,
351 cf_strerror(errno));
352 }
353
354 cf_sock_cfg* cfg = ssock->cfg;
355
356 // Ensure that proto_connections_closed is read first.
357 uint64_t n_closed = g_stats.proto_connections_closed;
358 uint64_t n_opened = g_stats.proto_connections_opened;
359 uint64_t n_open = n_opened - n_closed;
360
361 // TODO - XDR exemption to become a special feature.
362 if (n_open >= g_config.n_proto_fd_max &&
363 cfg->owner != CF_SOCK_OWNER_XDR) {
364 cf_ticker_warning(AS_SERVICE,
365 "refusing client connection - proto-fd-max %u",
366 g_config.n_proto_fd_max);
367
368 cf_socket_close(&csock);
369 cf_socket_term(&csock);
370 continue;
371 }
372
373 if (cfg->owner == CF_SOCK_OWNER_SERVICE_TLS) {
374 tls_socket_prepare_server(g_service_tls, &csock);
375 }
376
377 as_file_handle* fd_h = cf_rc_alloc(sizeof(as_file_handle));
378 // Ref for epoll instance.
379
380 cf_sock_addr_to_string_safe(&caddr, fd_h->client,
381 sizeof(fd_h->client));
382 cf_socket_copy(&csock, &fd_h->sock);
383
384 fd_h->last_used = cf_getns();
385 fd_h->in_transaction = 0;
386 fd_h->move_me = false;
387 fd_h->reap_me = false;
388 fd_h->is_xdr = false;
389 fd_h->proto = NULL;
390 fd_h->proto_unread = sizeof(as_proto);
391 fd_h->security_filter = as_security_filter_create();
392
393 cf_rc_reserve(fd_h); // ref for reaper
394
395 cf_mutex_lock(&g_reaper_lock);
396
397 uint32_t slot;
398
399 if (cf_queue_pop(&g_free_slots, &slot, CF_QUEUE_NOWAIT) !=
400 CF_QUEUE_OK) {
401 cf_crash(AS_SERVICE, "cannot get free slot");
402 }
403
404 g_file_handles[slot] = fd_h;
405
406 cf_mutex_unlock(&g_reaper_lock);
407
408 assign_socket(fd_h, EPOLLIN); // needs to be armed (EPOLLIN)
409
410 cf_atomic64_incr(&g_stats.proto_connections_opened);
411 }
412 }
413
414 return NULL;
415}
416
417
418//==========================================================
419// Local helpers - assign client connections to threads.
420//
421
422static void
423assign_socket(as_file_handle* fd_h, uint32_t events)
424{
425 while (true) {
426 uint32_t sid;
427
428 switch (g_config.auto_pin) {
429 case CF_TOPO_AUTO_PIN_NONE:
430 sid = select_sid();
431 break;
432 case CF_TOPO_AUTO_PIN_CPU:
433 case CF_TOPO_AUTO_PIN_NUMA:
434 sid = select_sid_pinned(cf_topo_socket_cpu(&fd_h->sock));
435 break;
436 case CF_TOPO_AUTO_PIN_ADQ:
437 sid = select_sid_adq(cf_topo_socket_napi_id(&fd_h->sock));
438 break;
439 default:
440 cf_crash(AS_SERVICE, "bad auto-pin %d", g_config.auto_pin);
441 return;
442 }
443
444 cf_mutex_lock(&g_thread_locks[sid]);
445
446 thread_ctx* ctx = g_thread_ctxs[sid];
447
448 if (ctx != NULL) {
449 fd_h->poll = ctx->poll;
450
451 cf_poll_add_socket(fd_h->poll, &fd_h->sock,
452 events | EPOLLONESHOT | EPOLLRDHUP, fd_h);
453
454 cf_mutex_unlock(&g_thread_locks[sid]);
455 break;
456 }
457
458 cf_mutex_unlock(&g_thread_locks[sid]);
459 }
460}
461
462static uint32_t
463select_sid(void)
464{
465 static uint32_t rr = 0;
466
467 return rr++ % g_config.n_service_threads;
468}
469
470static uint32_t
471select_sid_pinned(cf_topo_cpu_index i_cpu)
472{
473 static uint32_t rr[CPU_SETSIZE] = { 0 };
474
475 uint16_t n_cpus = cf_topo_count_cpus();
476 uint32_t threads_per_cpu = g_config.n_service_threads / n_cpus;
477
478 uint32_t thread_ix = rr[i_cpu]++ % threads_per_cpu;
479
480 return (thread_ix * n_cpus) + i_cpu;
481}
482
483static uint32_t
484select_sid_adq(cf_topo_napi_id id)
485{
486 return id == 0 ? select_sid() : id % g_config.n_service_threads;
487}
488
489static void
490schedule_redistribution(void)
491{
492 cf_mutex_lock(&g_reaper_lock);
493
494 uint32_t n_remaining = g_n_slots - (uint32_t)cf_queue_sz(&g_free_slots);
495
496 for (uint32_t i = 0; n_remaining != 0; i++) {
497 as_file_handle* fd_h = g_file_handles[i];
498
499 if (fd_h != NULL) {
500 fd_h->move_me = true;
501 n_remaining--;
502 }
503 }
504
505 cf_mutex_unlock(&g_reaper_lock);
506}
507
508
509//==========================================================
510// Local helpers - demarshal client requests.
511//
512
513static void*
514run_service(void* udata)
515{
516 thread_ctx* ctx = (thread_ctx*)udata;
517
518 cf_detail(AS_SERVICE, "running ctx %p", ctx);
519
520 if (as_config_is_cpu_pinned()) {
521 cf_topo_pin_to_cpu(ctx->i_cpu);
522 }
523
524 cf_poll poll = ctx->poll;
525 cf_epoll_queue* trans_q = &ctx->trans_q;
526
527 cf_poll_add_fd(poll, trans_q->event_fd, EPOLLIN, trans_q);
528
529 while (true) {
530 cf_poll_event events[N_EVENTS];
531 int32_t n_events = cf_poll_wait(poll, events, N_EVENTS, -1);
532
533 cf_assert(n_events >= 0, AS_SERVICE, "unexpected EINTR");
534
535 for (uint32_t i = 0; i < (uint32_t)n_events; i++) {
536 if (events[i].data == trans_q) {
537 cf_assert(events[i].events == EPOLLIN, AS_SERVICE,
538 "unexpected event: 0x%0x", events[i].events);
539
540 if (start_internal_transaction(ctx)) {
541 continue;
542 }
543
544 stop_service(ctx);
545
546 return NULL;
547 }
548
549 as_file_handle* fd_h = events[i].data;
550
551 if ((events[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) != 0) {
552 service_release_file_handle(fd_h);
553 continue;
554 }
555
556 if (tls_socket_needs_handshake(&fd_h->sock)) {
557 int32_t tls_ev = tls_socket_accept(&fd_h->sock);
558
559 if (tls_ev == EPOLLERR) {
560 service_release_file_handle(fd_h);
561 continue;
562 }
563
564 if (tls_ev == 0) {
565 tls_socket_must_not_have_data(&fd_h->sock,
566 "service handshake");
567 tls_ev = EPOLLIN;
568 }
569
570 cf_poll_modify_socket(fd_h->poll, &fd_h->sock,
571 (uint32_t)tls_ev | EPOLLONESHOT | EPOLLRDHUP, fd_h);
572 continue;
573 }
574
575 if (fd_h->proto == NULL && fd_h->proto_unread == sizeof(as_proto)) {
576 fd_h->last_used = cf_getns(); // request start time - for now
577 }
578
579 if (! process_readable(fd_h)) {
580 service_release_file_handle(fd_h);
581 continue;
582 }
583
584 tls_socket_must_not_have_data(&fd_h->sock, "full client read");
585
586 if (fd_h->proto_unread != 0) {
587 as_service_rearm(fd_h);
588 continue;
589 }
590
591 if (fd_h->move_me) {
592 cf_poll_delete_socket(fd_h->poll, &fd_h->sock);
593 assign_socket(fd_h, 0); // known to be unarmed (no EPOLLIN)
594
595 fd_h->move_me = false;
596 }
597
598 // Note that epoll cannot trigger again for this file handle during
599 // the transaction. We'll rearm at the end of the transaction.
600 start_transaction(fd_h);
601 }
602 }
603
604 return NULL;
605}
606
607static void
608stop_service(thread_ctx* ctx)
609{
610 cf_detail(AS_SERVICE, "stopping ctx %p", ctx);
611
612 while (true) {
613 bool any_in_transaction = false;
614
615 cf_mutex_lock(&g_reaper_lock);
616
617 uint32_t n_remaining = g_n_slots - (uint32_t)cf_queue_sz(&g_free_slots);
618
619 for (uint32_t i = 0; n_remaining != 0; i++) {
620 as_file_handle* fd_h = g_file_handles[i];
621
622 if (fd_h == NULL) {
623 continue;
624 }
625
626 n_remaining--;
627
628 // Ignore, if another thread's or INVALID_POLL.
629 if (! cf_poll_equal(fd_h->poll, ctx->poll)) {
630 continue;
631 }
632
633 // Don't transfer during TLS handshake - might need EPOLLOUT.
634 if (tls_socket_needs_handshake(&fd_h->sock)) {
635 service_release_file_handle(fd_h);
636 continue;
637 }
638
639 if (fd_h->in_transaction != 0) {
640 any_in_transaction = true;
641 continue;
642 }
643
644 cf_poll_delete_socket(fd_h->poll, &fd_h->sock);
645 assign_socket(fd_h, EPOLLIN); // known to be armed (EPOLLIN)
646 }
647
648 cf_mutex_unlock(&g_reaper_lock);
649
650 if (! any_in_transaction) {
651 break;
652 }
653
654 sleep(1);
655 }
656
657 cf_poll_destroy(ctx->poll);
658 cf_epoll_queue_destroy(&ctx->trans_q);
659
660 cf_free(ctx);
661
662 cf_detail(AS_SERVICE, "stopped ctx %p", ctx);
663}
664
665static void
666service_release_file_handle(as_file_handle* fd_h)
667{
668 cf_poll_delete_socket(fd_h->poll, &fd_h->sock);
669 fd_h->poll = INVALID_POLL;
670 fd_h->reap_me = true;
671 as_release_file_handle(fd_h);
672}
673
674static bool
675process_readable(as_file_handle* fd_h)
676{
677 uint8_t* end = fd_h->proto == NULL ?
678 (uint8_t*)&fd_h->proto_hdr + sizeof(as_proto) : // header
679 fd_h->proto->body + fd_h->proto->sz; // body
680
681 while (true) {
682 int32_t sz = cf_socket_recv(&fd_h->sock, end - fd_h->proto_unread,
683 fd_h->proto_unread, 0);
684
685 if (sz < 0) {
686 return errno == EAGAIN || errno == EWOULDBLOCK;
687 }
688
689 if (sz == 0) {
690 return false;
691 }
692
693 fd_h->proto_unread -= (uint64_t)sz;
694
695 if (fd_h->proto_unread != 0) {
696 continue; // drain socket (and OpenSSL's internal buffer) dry
697 }
698
699 if (fd_h->proto != NULL) {
700 return true; // done with entire request
701 }
702 // else - switch from header to body.
703
704 // Check for a TLS ClientHello arriving at a non-TLS socket. Heuristic:
705 // - tls[0] == ContentType.handshake (22)
706 // - tls[1] == ProtocolVersion.major (3)
707 // - tls[5] == HandshakeType.client_hello (1)
708
709 uint8_t* tls = (uint8_t*)&fd_h->proto_hdr;
710
711 if (tls[0] == 22 && tls[1] == 3 && tls[5] == 1) {
712 cf_warning(AS_SERVICE, "ignoring TLS connection from %s",
713 fd_h->client);
714 return false;
715 }
716
717 // For backward compatibility, allow version 0 with security messages.
718 if (fd_h->proto_hdr.version != PROTO_VERSION &&
719 ! (fd_h->proto_hdr.version == 0 &&
720 fd_h->proto_hdr.type == PROTO_TYPE_SECURITY)) {
721 cf_warning(AS_SERVICE, "unsupported proto version %d from %s",
722 fd_h->proto_hdr.version, fd_h->client);
723 return false;
724 }
725
726 if (! as_proto_is_valid_type(&fd_h->proto_hdr)) {
727 cf_warning(AS_SERVICE, "unsupported proto type %d from %s",
728 fd_h->proto_hdr.type, fd_h->client);
729 return false;
730 }
731
732 as_proto_swap(&fd_h->proto_hdr);
733
734 if (fd_h->proto_hdr.sz > PROTO_SIZE_MAX) {
735 cf_warning(AS_SERVICE, "invalid proto size %lu from %s",
736 (uint64_t)fd_h->proto_hdr.sz, fd_h->client);
737 return false;
738 }
739
740 fd_h->proto = cf_malloc(sizeof(as_proto) + fd_h->proto_hdr.sz);
741 memcpy(fd_h->proto, &fd_h->proto_hdr, sizeof(as_proto));
742
743 fd_h->proto_unread = fd_h->proto->sz;
744 end = fd_h->proto->body + fd_h->proto->sz;
745 }
746}
747
748static void
749start_transaction(as_file_handle* fd_h)
750{
751 // as_end_of_transaction() rearms then decrements, so this may be > 1.
752 as_incr_uint32(&fd_h->in_transaction);
753
754 uint64_t start_ns = fd_h->last_used;
755 as_proto* proto = fd_h->proto;
756
757 fd_h->proto = NULL;
758 fd_h->proto_unread = sizeof(as_proto);
759
760 if (proto->type == PROTO_TYPE_INFO) {
761 as_info_transaction it = {
762 .fd_h = fd_h,
763 .proto = proto,
764 .start_time = start_ns
765 };
766
767 as_info(&it);
768 return;
769 }
770
771 as_transaction tr;
772 as_transaction_init_head(&tr, NULL, (cl_msg*)proto);
773
774 tr.origin = FROM_CLIENT;
775 tr.from.proto_fd_h = fd_h;
776 tr.start_time = start_ns;
777
778 if (proto->type == PROTO_TYPE_SECURITY) {
779 as_security_transact(&tr);
780 return;
781 }
782
783 if (proto->type == PROTO_TYPE_AS_MSG_COMPRESSED) {
784 uint8_t* buf = NULL;
785 uint64_t buf_sz = 0;
786
787 if (! decompress_msg((as_comp_proto*)proto, &buf, &buf_sz)) {
788 as_transaction_demarshal_error(&tr, AS_ERR_UNKNOWN);
789 return;
790 }
791
792 cf_free(proto);
793
794 proto = (as_proto*)buf;
795 tr.msgp = (cl_msg*)proto;
796
797 as_proto_swap(proto);
798
799 if (! as_proto_wrapped_is_valid(proto, buf_sz)) {
800 cf_warning(AS_SERVICE, "decompressed proto: (%d,%d,%lu,%lu)",
801 proto->version, proto->type, (uint64_t)proto->sz, buf_sz);
802 as_transaction_demarshal_error(&tr, AS_ERR_UNKNOWN);
803 return;
804 }
805 }
806
807 if (as_transaction_is_xdr(&tr) && ! fd_h->is_xdr) {
808 config_xdr_socket(&fd_h->sock);
809 fd_h->is_xdr = true;
810 }
811
812 if (g_config.svc_benchmarks_enabled) {
813 tr.benchmark_time = histogram_insert_data_point(
814 g_stats.svc_demarshal_hist, start_ns);
815 }
816
817 if (tr.msgp->msg.info1 & AS_MSG_INFO1_BATCH) {
818 as_batch_queue_task(&tr);
819 return;
820 }
821
822 if (! as_transaction_prepare(&tr, true)) {
823 as_transaction_demarshal_error(&tr, AS_ERR_PARAMETER);
824 return;
825 }
826
827 as_tsvc_process_transaction(&tr);
828}
829
830static bool
831decompress_msg(as_comp_proto* cproto, uint8_t** out_buf, uint64_t* out_buf_sz)
832{
833 uint64_t orig_sz = cproto->orig_sz;
834
835 // Hack to handle both little and big endian formats. Some clients wrongly
836 // send the size in little-endian format. If we interpret a legal big-endian
837 // size as little-endian, it will be > PROTO_SIZE_MAX. Use it as a clue.
838 if (orig_sz > PROTO_SIZE_MAX) {
839 orig_sz = cf_swap_from_be64(cproto->orig_sz);
840
841 if (orig_sz > PROTO_SIZE_MAX) {
842 cf_warning(AS_SERVICE, "bad compressed packet size %lu", orig_sz);
843 return false;
844 }
845 }
846
847 uint8_t* decomp_buf = cf_malloc(orig_sz);
848 uint64_t decomp_buf_sz = orig_sz;
849 uint64_t comp_buf_sz = cproto->proto.sz - sizeof(cproto->orig_sz);
850 int rv = uncompress(decomp_buf, &decomp_buf_sz, cproto->data, comp_buf_sz);
851
852 if (rv != Z_OK) {
853 cf_warning(AS_SERVICE, "zlib decompression failed with error %d", rv);
854 cf_free(decomp_buf);
855 return false;
856 }
857
858 if (orig_sz != decomp_buf_sz) {
859 cf_warning(AS_SERVICE, "decompressed size %lu is not expected size %lu",
860 decomp_buf_sz, orig_sz);
861 cf_free(decomp_buf);
862 return false;
863 }
864
865 *out_buf = decomp_buf;
866 *out_buf_sz = decomp_buf_sz;
867
868 return true;
869}
870
871static void
872config_xdr_socket(cf_socket* sock)
873{
874 cf_socket_set_receive_buffer(sock, XDR_READ_BUFFER_SIZE);
875 cf_socket_set_send_buffer(sock, XDR_WRITE_BUFFER_SIZE);
876 cf_socket_set_window(sock, XDR_READ_BUFFER_SIZE);
877 cf_socket_enable_nagle(sock);
878}
879
880
881//==========================================================
882// Local helpers - reap idle and bad connections.
883//
884
885static void
886start_reaper(void)
887{
888 struct rlimit rl;
889
890 if (getrlimit(RLIMIT_NOFILE, &rl) < 0) {
891 cf_crash(AS_SERVICE, "getrlimit() failed: %s", cf_strerror(errno));
892 }
893
894 g_n_slots = (uint32_t)rl.rlim_cur;
895 g_file_handles = cf_calloc(g_n_slots, sizeof(as_file_handle*));
896
897 cf_queue_init(&g_free_slots, sizeof(uint32_t), g_n_slots, false);
898
899 for (uint32_t i = 0; i < g_n_slots; i++) {
900 cf_queue_push(&g_free_slots, &i);
901 }
902
903 cf_info(AS_SERVICE, "starting reaper thread");
904
905 cf_thread_create_detached(run_reaper, NULL);
906}
907
908static void*
909run_reaper(void* udata)
910{
911 (void)udata;
912
913 while (true) {
914 sleep(1);
915
916 bool security_refresh = as_security_should_refresh();
917
918 uint64_t kill_ns = (uint64_t)g_config.proto_fd_idle_ms * 1000000;
919 uint64_t now_ns = cf_getns();
920
921 cf_mutex_lock(&g_reaper_lock);
922
923 uint32_t n_remaining = g_n_slots - (uint32_t)cf_queue_sz(&g_free_slots);
924
925 for (uint32_t i = 0; n_remaining != 0; i++) {
926 as_file_handle* fd_h = g_file_handles[i];
927
928 if (fd_h == NULL) {
929 continue;
930 }
931
932 n_remaining--;
933
934 if (security_refresh) {
935 as_security_refresh(fd_h);
936 }
937
938 // reap_me overrides do_not_reap.
939 if (fd_h->reap_me) {
940 g_file_handles[i] = NULL;
941 cf_queue_push_head(&g_free_slots, &i);
942 as_release_file_handle(fd_h);
943 continue;
944 }
945
946 if (fd_h->in_transaction != 0) {
947 continue;
948 }
949
950 if (kill_ns != 0 && fd_h->last_used + kill_ns < now_ns) {
951 cf_socket_shutdown(&fd_h->sock); // will trigger epoll errors
952
953 g_file_handles[i] = NULL;
954 cf_queue_push_head(&g_free_slots, &i);
955 as_release_file_handle(fd_h);
956
957 g_stats.reaper_count++;
958 }
959 }
960
961 cf_mutex_unlock(&g_reaper_lock);
962 }
963
964 return NULL;
965}
966
967
968//==========================================================
969// Local helpers - transaction queue.
970//
971
972static bool
973start_internal_transaction(thread_ctx* ctx)
974{
975 as_transaction tr;
976
977 cf_mutex_lock(ctx->lock);
978
979 if (! cf_epoll_queue_pop(&ctx->trans_q, &tr)) {
980 cf_crash(AS_SERVICE, "unable to pop from transaction queue");
981 }
982
983 cf_mutex_unlock(ctx->lock);
984
985 if (tr.msgp == NULL) {
986 return false;
987 }
988
989 if (g_config.svc_benchmarks_enabled &&
990 tr.benchmark_time != 0 && ! as_transaction_is_restart(&tr)) {
991 histogram_insert_data_point(g_stats.svc_queue_hist, tr.benchmark_time);
992 }
993
994 as_tsvc_process_transaction(&tr);
995
996 return true;
997}
998