| 1 | /* |
| 2 | * service.c |
| 3 | * |
| 4 | * Copyright (C) 2018 Aerospike, Inc. |
| 5 | * |
| 6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
| 7 | * license agreements. |
| 8 | * |
| 9 | * This program is free software: you can redistribute it and/or modify it under |
| 10 | * the terms of the GNU Affero General Public License as published by the Free |
| 11 | * Software Foundation, either version 3 of the License, or (at your option) any |
| 12 | * later version. |
| 13 | * |
| 14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
| 15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
| 17 | * details. |
| 18 | * |
| 19 | * You should have received a copy of the GNU Affero General Public License |
| 20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
| 21 | */ |
| 22 | |
| 23 | //========================================================== |
| 24 | // Includes. |
| 25 | // |
| 26 | |
| 27 | #include "base/service.h" |
| 28 | |
| 29 | #include <errno.h> |
| 30 | #include <sched.h> |
| 31 | #include <stdbool.h> |
| 32 | #include <stddef.h> |
| 33 | #include <stdint.h> |
| 34 | #include <sys/epoll.h> |
| 35 | #include <sys/resource.h> |
| 36 | #include <sys/time.h> |
| 37 | #include <unistd.h> |
| 38 | #include <zlib.h> |
| 39 | |
| 40 | #include "citrusleaf/alloc.h" |
| 41 | #include "citrusleaf/cf_atomic.h" |
| 42 | #include "citrusleaf/cf_clock.h" |
| 43 | #include "citrusleaf/cf_queue.h" |
| 44 | |
| 45 | #include "cf_mutex.h" |
| 46 | #include "cf_thread.h" |
| 47 | #include "epoll_queue.h" |
| 48 | #include "fault.h" |
| 49 | #include "hardware.h" |
| 50 | #include "hist.h" |
| 51 | #include "socket.h" |
| 52 | #include "tls.h" |
| 53 | |
| 54 | #include "base/batch.h" |
| 55 | #include "base/cfg.h" |
| 56 | #include "base/datamodel.h" |
| 57 | #include "base/proto.h" |
| 58 | #include "base/security.h" |
| 59 | #include "base/stats.h" |
| 60 | #include "base/thr_info.h" |
| 61 | #include "base/thr_tsvc.h" |
| 62 | #include "base/transaction.h" |
| 63 | #include "base/xdr_serverside.h" |
| 64 | |
| 65 | #include "warnings.h" |
| 66 | |
| 67 | |
| 68 | //========================================================== |
| 69 | // Typedefs & constants. |
| 70 | // |
| 71 | |
| 72 | #define N_EVENTS 1024 |
| 73 | |
| 74 | #define XDR_WRITE_BUFFER_SIZE (5 * 1024 * 1024) |
| 75 | #define XDR_READ_BUFFER_SIZE (15 * 1024 * 1024) |
| 76 | |
| 77 | typedef struct thread_ctx_s { |
| 78 | cf_topo_cpu_index i_cpu; |
| 79 | cf_mutex* lock; |
| 80 | cf_poll poll; |
| 81 | cf_epoll_queue trans_q; |
| 82 | } thread_ctx; |
| 83 | |
| 84 | |
| 85 | //========================================================== |
| 86 | // Globals. |
| 87 | // |
| 88 | |
| 89 | as_service_access g_access = { |
| 90 | .service = { .addrs = { .n_addrs = 0 }, .port = 0 }, |
| 91 | .alt_service = { .addrs = { .n_addrs = 0 }, .port = 0 }, |
| 92 | .tls_service = { .addrs = { .n_addrs = 0 }, .port = 0 }, |
| 93 | .alt_tls_service = { .addrs = { .n_addrs = 0 }, .port = 0 } |
| 94 | }; |
| 95 | |
| 96 | cf_serv_cfg g_service_bind = { .n_cfgs = 0 }; |
| 97 | cf_tls_info* g_service_tls; |
| 98 | |
| 99 | static cf_sockets g_sockets; |
| 100 | |
| 101 | static cf_mutex g_thread_locks[MAX_SERVICE_THREADS]; |
| 102 | static thread_ctx* g_thread_ctxs[MAX_SERVICE_THREADS]; |
| 103 | |
| 104 | static cf_mutex g_reaper_lock = CF_MUTEX_INIT; |
| 105 | static uint32_t g_n_slots; |
| 106 | static as_file_handle** g_file_handles; |
| 107 | static cf_queue g_free_slots; |
| 108 | |
| 109 | |
| 110 | //========================================================== |
| 111 | // Forward declarations. |
| 112 | // |
| 113 | |
| 114 | // Setup. |
| 115 | static void create_service_thread(uint32_t sid); |
| 116 | static void add_localhost(cf_serv_cfg* serv_cfg, cf_sock_owner owner); |
| 117 | |
| 118 | // Accept client connections. |
| 119 | static void* run_accept(void* udata); |
| 120 | |
| 121 | // Assign client connections to threads. |
| 122 | static void assign_socket(as_file_handle* fd_h, uint32_t events); |
| 123 | static uint32_t select_sid(void); |
| 124 | static uint32_t select_sid_pinned(cf_topo_cpu_index i_cpu); |
| 125 | static uint32_t select_sid_adq(cf_topo_napi_id id); |
| 126 | static void schedule_redistribution(void); |
| 127 | |
| 128 | // Demarshal client requests. |
| 129 | static void* run_service(void* udata); |
| 130 | static void stop_service(thread_ctx* ctx); |
| 131 | static void service_release_file_handle(as_file_handle* fd_h); |
| 132 | static bool process_readable(as_file_handle* fd_h); |
| 133 | static void start_transaction(as_file_handle* fd_h); |
| 134 | static bool decompress_msg(as_comp_proto* cproto, uint8_t** out_buf, uint64_t* out_buf_sz); |
| 135 | static void config_xdr_socket(cf_socket* sock); |
| 136 | |
| 137 | // Reap idle and bad connections. |
| 138 | static void start_reaper(void); |
| 139 | static void* run_reaper(void* udata); |
| 140 | |
| 141 | // Transaction queue. |
| 142 | static bool start_internal_transaction(thread_ctx* ctx); |
| 143 | |
| 144 | |
| 145 | //========================================================== |
| 146 | // Public API. |
| 147 | // |
| 148 | |
| 149 | void |
| 150 | as_service_init(void) |
| 151 | { |
| 152 | // Create epoll instances and service threads. |
| 153 | |
| 154 | cf_info(AS_SERVICE, "starting %u service threads" , |
| 155 | g_config.n_service_threads); |
| 156 | |
| 157 | for (uint32_t i = 0; i < MAX_SERVICE_THREADS; i++) { |
| 158 | cf_mutex_init(&g_thread_locks[i]); |
| 159 | } |
| 160 | |
| 161 | for (uint32_t i = 0; i < g_config.n_service_threads; i++) { |
| 162 | create_service_thread(i); |
| 163 | } |
| 164 | } |
| 165 | |
| 166 | void |
| 167 | as_service_start(void) |
| 168 | { |
| 169 | start_reaper(); |
| 170 | |
| 171 | // Create listening sockets. |
| 172 | |
| 173 | add_localhost(&g_service_bind, CF_SOCK_OWNER_SERVICE); |
| 174 | add_localhost(&g_service_bind, CF_SOCK_OWNER_SERVICE_TLS); |
| 175 | |
| 176 | as_xdr_info_port(&g_service_bind); |
| 177 | |
| 178 | if (cf_socket_init_server(&g_service_bind, &g_sockets) < 0) { |
| 179 | cf_crash(AS_SERVICE, "couldn't initialize service socket" ); |
| 180 | } |
| 181 | |
| 182 | cf_socket_show_server(AS_SERVICE, "client" , &g_sockets); |
| 183 | |
| 184 | // Create accept thread. |
| 185 | |
| 186 | cf_info(AS_SERVICE, "starting accept thread" ); |
| 187 | |
| 188 | cf_thread_create_detached(run_accept, NULL); |
| 189 | } |
| 190 | |
| 191 | void |
| 192 | as_service_set_threads(uint32_t n_threads) |
| 193 | { |
| 194 | uint32_t old_n_threads = g_config.n_service_threads; |
| 195 | |
| 196 | if (n_threads > old_n_threads) { |
| 197 | for (uint32_t sid = old_n_threads; sid < n_threads; sid++) { |
| 198 | create_service_thread(sid); |
| 199 | } |
| 200 | |
| 201 | g_config.n_service_threads = n_threads; |
| 202 | |
| 203 | schedule_redistribution(); |
| 204 | } |
| 205 | else if (n_threads < old_n_threads) { |
| 206 | g_config.n_service_threads = n_threads; |
| 207 | |
| 208 | for (uint32_t sid = n_threads; sid < old_n_threads; sid++) { |
| 209 | cf_mutex_lock(&g_thread_locks[sid]); |
| 210 | |
| 211 | thread_ctx* ctx = g_thread_ctxs[sid]; |
| 212 | |
| 213 | cf_detail(AS_SERVICE, "sending terminator sid %u ctx %p" , sid, ctx); |
| 214 | |
| 215 | as_transaction tr = { .msgp = NULL }; |
| 216 | |
| 217 | cf_epoll_queue_push(&ctx->trans_q, &tr); |
| 218 | g_thread_ctxs[sid] = NULL; |
| 219 | |
| 220 | cf_mutex_unlock(&g_thread_locks[sid]); |
| 221 | } |
| 222 | } |
| 223 | } |
| 224 | |
| 225 | void |
| 226 | as_service_rearm(as_file_handle* fd_h) |
| 227 | { |
| 228 | cf_poll_modify_socket(fd_h->poll, &fd_h->sock, |
| 229 | EPOLLIN | EPOLLONESHOT | EPOLLRDHUP, fd_h); |
| 230 | } |
| 231 | |
| 232 | void |
| 233 | as_service_enqueue_internal(as_transaction* tr) |
| 234 | { |
| 235 | while (true) { |
| 236 | uint32_t sid = as_config_is_cpu_pinned() ? |
| 237 | select_sid_pinned(cf_topo_current_cpu()) : select_sid(); |
| 238 | |
| 239 | cf_mutex_lock(&g_thread_locks[sid]); |
| 240 | |
| 241 | thread_ctx* ctx = g_thread_ctxs[sid]; |
| 242 | |
| 243 | if (ctx != NULL) { |
| 244 | cf_epoll_queue_push(&ctx->trans_q, tr); |
| 245 | cf_mutex_unlock(&g_thread_locks[sid]); |
| 246 | break; |
| 247 | } |
| 248 | |
| 249 | cf_mutex_unlock(&g_thread_locks[sid]); |
| 250 | } |
| 251 | } |
| 252 | |
| 253 | |
| 254 | //========================================================== |
| 255 | // Local helpers - setup. |
| 256 | // |
| 257 | |
| 258 | void |
| 259 | create_service_thread(uint32_t sid) |
| 260 | { |
| 261 | thread_ctx* ctx = cf_malloc(sizeof(thread_ctx)); |
| 262 | |
| 263 | cf_detail(AS_SERVICE, "starting sid %u ctx %p" , sid, ctx); |
| 264 | |
| 265 | if (as_config_is_cpu_pinned()) { |
| 266 | ctx->i_cpu = (cf_topo_cpu_index)(sid % cf_topo_count_cpus()); |
| 267 | } |
| 268 | |
| 269 | ctx->lock = &g_thread_locks[sid]; |
| 270 | cf_poll_create(&ctx->poll); |
| 271 | cf_epoll_queue_init(&ctx->trans_q, AS_TRANSACTION_HEAD_SIZE, 64); |
| 272 | |
| 273 | cf_thread_create_detached(run_service, ctx); |
| 274 | |
| 275 | cf_mutex_lock(&g_thread_locks[sid]); |
| 276 | |
| 277 | g_thread_ctxs[sid] = ctx; |
| 278 | |
| 279 | cf_mutex_unlock(&g_thread_locks[sid]); |
| 280 | } |
| 281 | |
| 282 | static void |
| 283 | add_localhost(cf_serv_cfg* serv_cfg, cf_sock_owner owner) |
| 284 | { |
| 285 | // Localhost will only be added to the addresses, if we're not yet listening |
| 286 | // on wildcard ("any") or localhost. |
| 287 | |
| 288 | cf_ip_port port = 0; |
| 289 | |
| 290 | for (uint32_t i = 0; i < serv_cfg->n_cfgs; i++) { |
| 291 | if (serv_cfg->cfgs[i].owner != owner) { |
| 292 | continue; |
| 293 | } |
| 294 | |
| 295 | port = serv_cfg->cfgs[i].port; |
| 296 | |
| 297 | if (cf_ip_addr_is_any(&serv_cfg->cfgs[i].addr) || |
| 298 | cf_ip_addr_is_local(&serv_cfg->cfgs[i].addr)) { |
| 299 | return; |
| 300 | } |
| 301 | } |
| 302 | |
| 303 | if (port == 0) { |
| 304 | return; |
| 305 | } |
| 306 | |
| 307 | cf_sock_cfg sock_cfg; |
| 308 | |
| 309 | cf_sock_cfg_init(&sock_cfg, owner); |
| 310 | sock_cfg.port = port; |
| 311 | cf_ip_addr_set_local(&sock_cfg.addr); |
| 312 | |
| 313 | if (cf_serv_cfg_add_sock_cfg(serv_cfg, &sock_cfg) < 0) { |
| 314 | cf_crash(AS_SERVICE, "couldn't add localhost listening address" ); |
| 315 | } |
| 316 | } |
| 317 | |
| 318 | |
| 319 | //========================================================== |
| 320 | // Local helpers - accept client connections. |
| 321 | // |
| 322 | |
| 323 | static void* |
| 324 | run_accept(void* udata) |
| 325 | { |
| 326 | (void)udata; |
| 327 | |
| 328 | cf_poll poll; |
| 329 | cf_poll_create(&poll); |
| 330 | |
| 331 | cf_poll_add_sockets(poll, &g_sockets, EPOLLIN); |
| 332 | |
| 333 | while (true) { |
| 334 | cf_poll_event events[N_EVENTS]; |
| 335 | int32_t n_events = cf_poll_wait(poll, events, N_EVENTS, -1); |
| 336 | |
| 337 | cf_assert(n_events >= 0, AS_SERVICE, "unexpected EINTR" ); |
| 338 | |
| 339 | for (uint32_t i = 0; i < (uint32_t)n_events; i++) { |
| 340 | cf_socket* ssock = events[i].data; |
| 341 | cf_socket csock; |
| 342 | cf_sock_addr caddr; |
| 343 | |
| 344 | if (cf_socket_accept(ssock, &csock, &caddr) < 0) { |
| 345 | if (errno == EMFILE || errno == ENFILE) { |
| 346 | cf_ticker_warning(AS_SERVICE, "out of file descriptors" ); |
| 347 | continue; |
| 348 | } |
| 349 | |
| 350 | cf_crash(AS_SERVICE, "accept() failed: %d (%s)" , errno, |
| 351 | cf_strerror(errno)); |
| 352 | } |
| 353 | |
| 354 | cf_sock_cfg* cfg = ssock->cfg; |
| 355 | |
| 356 | // Ensure that proto_connections_closed is read first. |
| 357 | uint64_t n_closed = g_stats.proto_connections_closed; |
| 358 | uint64_t n_opened = g_stats.proto_connections_opened; |
| 359 | uint64_t n_open = n_opened - n_closed; |
| 360 | |
| 361 | // TODO - XDR exemption to become a special feature. |
| 362 | if (n_open >= g_config.n_proto_fd_max && |
| 363 | cfg->owner != CF_SOCK_OWNER_XDR) { |
| 364 | cf_ticker_warning(AS_SERVICE, |
| 365 | "refusing client connection - proto-fd-max %u" , |
| 366 | g_config.n_proto_fd_max); |
| 367 | |
| 368 | cf_socket_close(&csock); |
| 369 | cf_socket_term(&csock); |
| 370 | continue; |
| 371 | } |
| 372 | |
| 373 | if (cfg->owner == CF_SOCK_OWNER_SERVICE_TLS) { |
| 374 | tls_socket_prepare_server(g_service_tls, &csock); |
| 375 | } |
| 376 | |
| 377 | as_file_handle* fd_h = cf_rc_alloc(sizeof(as_file_handle)); |
| 378 | // Ref for epoll instance. |
| 379 | |
| 380 | cf_sock_addr_to_string_safe(&caddr, fd_h->client, |
| 381 | sizeof(fd_h->client)); |
| 382 | cf_socket_copy(&csock, &fd_h->sock); |
| 383 | |
| 384 | fd_h->last_used = cf_getns(); |
| 385 | fd_h->in_transaction = 0; |
| 386 | fd_h->move_me = false; |
| 387 | fd_h->reap_me = false; |
| 388 | fd_h->is_xdr = false; |
| 389 | fd_h->proto = NULL; |
| 390 | fd_h->proto_unread = sizeof(as_proto); |
| 391 | fd_h->security_filter = as_security_filter_create(); |
| 392 | |
| 393 | cf_rc_reserve(fd_h); // ref for reaper |
| 394 | |
| 395 | cf_mutex_lock(&g_reaper_lock); |
| 396 | |
| 397 | uint32_t slot; |
| 398 | |
| 399 | if (cf_queue_pop(&g_free_slots, &slot, CF_QUEUE_NOWAIT) != |
| 400 | CF_QUEUE_OK) { |
| 401 | cf_crash(AS_SERVICE, "cannot get free slot" ); |
| 402 | } |
| 403 | |
| 404 | g_file_handles[slot] = fd_h; |
| 405 | |
| 406 | cf_mutex_unlock(&g_reaper_lock); |
| 407 | |
| 408 | assign_socket(fd_h, EPOLLIN); // needs to be armed (EPOLLIN) |
| 409 | |
| 410 | cf_atomic64_incr(&g_stats.proto_connections_opened); |
| 411 | } |
| 412 | } |
| 413 | |
| 414 | return NULL; |
| 415 | } |
| 416 | |
| 417 | |
| 418 | //========================================================== |
| 419 | // Local helpers - assign client connections to threads. |
| 420 | // |
| 421 | |
| 422 | static void |
| 423 | assign_socket(as_file_handle* fd_h, uint32_t events) |
| 424 | { |
| 425 | while (true) { |
| 426 | uint32_t sid; |
| 427 | |
| 428 | switch (g_config.auto_pin) { |
| 429 | case CF_TOPO_AUTO_PIN_NONE: |
| 430 | sid = select_sid(); |
| 431 | break; |
| 432 | case CF_TOPO_AUTO_PIN_CPU: |
| 433 | case CF_TOPO_AUTO_PIN_NUMA: |
| 434 | sid = select_sid_pinned(cf_topo_socket_cpu(&fd_h->sock)); |
| 435 | break; |
| 436 | case CF_TOPO_AUTO_PIN_ADQ: |
| 437 | sid = select_sid_adq(cf_topo_socket_napi_id(&fd_h->sock)); |
| 438 | break; |
| 439 | default: |
| 440 | cf_crash(AS_SERVICE, "bad auto-pin %d" , g_config.auto_pin); |
| 441 | return; |
| 442 | } |
| 443 | |
| 444 | cf_mutex_lock(&g_thread_locks[sid]); |
| 445 | |
| 446 | thread_ctx* ctx = g_thread_ctxs[sid]; |
| 447 | |
| 448 | if (ctx != NULL) { |
| 449 | fd_h->poll = ctx->poll; |
| 450 | |
| 451 | cf_poll_add_socket(fd_h->poll, &fd_h->sock, |
| 452 | events | EPOLLONESHOT | EPOLLRDHUP, fd_h); |
| 453 | |
| 454 | cf_mutex_unlock(&g_thread_locks[sid]); |
| 455 | break; |
| 456 | } |
| 457 | |
| 458 | cf_mutex_unlock(&g_thread_locks[sid]); |
| 459 | } |
| 460 | } |
| 461 | |
| 462 | static uint32_t |
| 463 | select_sid(void) |
| 464 | { |
| 465 | static uint32_t rr = 0; |
| 466 | |
| 467 | return rr++ % g_config.n_service_threads; |
| 468 | } |
| 469 | |
| 470 | static uint32_t |
| 471 | select_sid_pinned(cf_topo_cpu_index i_cpu) |
| 472 | { |
| 473 | static uint32_t rr[CPU_SETSIZE] = { 0 }; |
| 474 | |
| 475 | uint16_t n_cpus = cf_topo_count_cpus(); |
| 476 | uint32_t threads_per_cpu = g_config.n_service_threads / n_cpus; |
| 477 | |
| 478 | uint32_t thread_ix = rr[i_cpu]++ % threads_per_cpu; |
| 479 | |
| 480 | return (thread_ix * n_cpus) + i_cpu; |
| 481 | } |
| 482 | |
| 483 | static uint32_t |
| 484 | select_sid_adq(cf_topo_napi_id id) |
| 485 | { |
| 486 | return id == 0 ? select_sid() : id % g_config.n_service_threads; |
| 487 | } |
| 488 | |
| 489 | static void |
| 490 | schedule_redistribution(void) |
| 491 | { |
| 492 | cf_mutex_lock(&g_reaper_lock); |
| 493 | |
| 494 | uint32_t n_remaining = g_n_slots - (uint32_t)cf_queue_sz(&g_free_slots); |
| 495 | |
| 496 | for (uint32_t i = 0; n_remaining != 0; i++) { |
| 497 | as_file_handle* fd_h = g_file_handles[i]; |
| 498 | |
| 499 | if (fd_h != NULL) { |
| 500 | fd_h->move_me = true; |
| 501 | n_remaining--; |
| 502 | } |
| 503 | } |
| 504 | |
| 505 | cf_mutex_unlock(&g_reaper_lock); |
| 506 | } |
| 507 | |
| 508 | |
| 509 | //========================================================== |
| 510 | // Local helpers - demarshal client requests. |
| 511 | // |
| 512 | |
| 513 | static void* |
| 514 | run_service(void* udata) |
| 515 | { |
| 516 | thread_ctx* ctx = (thread_ctx*)udata; |
| 517 | |
| 518 | cf_detail(AS_SERVICE, "running ctx %p" , ctx); |
| 519 | |
| 520 | if (as_config_is_cpu_pinned()) { |
| 521 | cf_topo_pin_to_cpu(ctx->i_cpu); |
| 522 | } |
| 523 | |
| 524 | cf_poll poll = ctx->poll; |
| 525 | cf_epoll_queue* trans_q = &ctx->trans_q; |
| 526 | |
| 527 | cf_poll_add_fd(poll, trans_q->event_fd, EPOLLIN, trans_q); |
| 528 | |
| 529 | while (true) { |
| 530 | cf_poll_event events[N_EVENTS]; |
| 531 | int32_t n_events = cf_poll_wait(poll, events, N_EVENTS, -1); |
| 532 | |
| 533 | cf_assert(n_events >= 0, AS_SERVICE, "unexpected EINTR" ); |
| 534 | |
| 535 | for (uint32_t i = 0; i < (uint32_t)n_events; i++) { |
| 536 | if (events[i].data == trans_q) { |
| 537 | cf_assert(events[i].events == EPOLLIN, AS_SERVICE, |
| 538 | "unexpected event: 0x%0x" , events[i].events); |
| 539 | |
| 540 | if (start_internal_transaction(ctx)) { |
| 541 | continue; |
| 542 | } |
| 543 | |
| 544 | stop_service(ctx); |
| 545 | |
| 546 | return NULL; |
| 547 | } |
| 548 | |
| 549 | as_file_handle* fd_h = events[i].data; |
| 550 | |
| 551 | if ((events[i].events & (EPOLLRDHUP | EPOLLERR | EPOLLHUP)) != 0) { |
| 552 | service_release_file_handle(fd_h); |
| 553 | continue; |
| 554 | } |
| 555 | |
| 556 | if (tls_socket_needs_handshake(&fd_h->sock)) { |
| 557 | int32_t tls_ev = tls_socket_accept(&fd_h->sock); |
| 558 | |
| 559 | if (tls_ev == EPOLLERR) { |
| 560 | service_release_file_handle(fd_h); |
| 561 | continue; |
| 562 | } |
| 563 | |
| 564 | if (tls_ev == 0) { |
| 565 | tls_socket_must_not_have_data(&fd_h->sock, |
| 566 | "service handshake" ); |
| 567 | tls_ev = EPOLLIN; |
| 568 | } |
| 569 | |
| 570 | cf_poll_modify_socket(fd_h->poll, &fd_h->sock, |
| 571 | (uint32_t)tls_ev | EPOLLONESHOT | EPOLLRDHUP, fd_h); |
| 572 | continue; |
| 573 | } |
| 574 | |
| 575 | if (fd_h->proto == NULL && fd_h->proto_unread == sizeof(as_proto)) { |
| 576 | fd_h->last_used = cf_getns(); // request start time - for now |
| 577 | } |
| 578 | |
| 579 | if (! process_readable(fd_h)) { |
| 580 | service_release_file_handle(fd_h); |
| 581 | continue; |
| 582 | } |
| 583 | |
| 584 | tls_socket_must_not_have_data(&fd_h->sock, "full client read" ); |
| 585 | |
| 586 | if (fd_h->proto_unread != 0) { |
| 587 | as_service_rearm(fd_h); |
| 588 | continue; |
| 589 | } |
| 590 | |
| 591 | if (fd_h->move_me) { |
| 592 | cf_poll_delete_socket(fd_h->poll, &fd_h->sock); |
| 593 | assign_socket(fd_h, 0); // known to be unarmed (no EPOLLIN) |
| 594 | |
| 595 | fd_h->move_me = false; |
| 596 | } |
| 597 | |
| 598 | // Note that epoll cannot trigger again for this file handle during |
| 599 | // the transaction. We'll rearm at the end of the transaction. |
| 600 | start_transaction(fd_h); |
| 601 | } |
| 602 | } |
| 603 | |
| 604 | return NULL; |
| 605 | } |
| 606 | |
| 607 | static void |
| 608 | stop_service(thread_ctx* ctx) |
| 609 | { |
| 610 | cf_detail(AS_SERVICE, "stopping ctx %p" , ctx); |
| 611 | |
| 612 | while (true) { |
| 613 | bool any_in_transaction = false; |
| 614 | |
| 615 | cf_mutex_lock(&g_reaper_lock); |
| 616 | |
| 617 | uint32_t n_remaining = g_n_slots - (uint32_t)cf_queue_sz(&g_free_slots); |
| 618 | |
| 619 | for (uint32_t i = 0; n_remaining != 0; i++) { |
| 620 | as_file_handle* fd_h = g_file_handles[i]; |
| 621 | |
| 622 | if (fd_h == NULL) { |
| 623 | continue; |
| 624 | } |
| 625 | |
| 626 | n_remaining--; |
| 627 | |
| 628 | // Ignore, if another thread's or INVALID_POLL. |
| 629 | if (! cf_poll_equal(fd_h->poll, ctx->poll)) { |
| 630 | continue; |
| 631 | } |
| 632 | |
| 633 | // Don't transfer during TLS handshake - might need EPOLLOUT. |
| 634 | if (tls_socket_needs_handshake(&fd_h->sock)) { |
| 635 | service_release_file_handle(fd_h); |
| 636 | continue; |
| 637 | } |
| 638 | |
| 639 | if (fd_h->in_transaction != 0) { |
| 640 | any_in_transaction = true; |
| 641 | continue; |
| 642 | } |
| 643 | |
| 644 | cf_poll_delete_socket(fd_h->poll, &fd_h->sock); |
| 645 | assign_socket(fd_h, EPOLLIN); // known to be armed (EPOLLIN) |
| 646 | } |
| 647 | |
| 648 | cf_mutex_unlock(&g_reaper_lock); |
| 649 | |
| 650 | if (! any_in_transaction) { |
| 651 | break; |
| 652 | } |
| 653 | |
| 654 | sleep(1); |
| 655 | } |
| 656 | |
| 657 | cf_poll_destroy(ctx->poll); |
| 658 | cf_epoll_queue_destroy(&ctx->trans_q); |
| 659 | |
| 660 | cf_free(ctx); |
| 661 | |
| 662 | cf_detail(AS_SERVICE, "stopped ctx %p" , ctx); |
| 663 | } |
| 664 | |
| 665 | static void |
| 666 | service_release_file_handle(as_file_handle* fd_h) |
| 667 | { |
| 668 | cf_poll_delete_socket(fd_h->poll, &fd_h->sock); |
| 669 | fd_h->poll = INVALID_POLL; |
| 670 | fd_h->reap_me = true; |
| 671 | as_release_file_handle(fd_h); |
| 672 | } |
| 673 | |
| 674 | static bool |
| 675 | process_readable(as_file_handle* fd_h) |
| 676 | { |
| 677 | uint8_t* end = fd_h->proto == NULL ? |
| 678 | (uint8_t*)&fd_h->proto_hdr + sizeof(as_proto) : // header |
| 679 | fd_h->proto->body + fd_h->proto->sz; // body |
| 680 | |
| 681 | while (true) { |
| 682 | int32_t sz = cf_socket_recv(&fd_h->sock, end - fd_h->proto_unread, |
| 683 | fd_h->proto_unread, 0); |
| 684 | |
| 685 | if (sz < 0) { |
| 686 | return errno == EAGAIN || errno == EWOULDBLOCK; |
| 687 | } |
| 688 | |
| 689 | if (sz == 0) { |
| 690 | return false; |
| 691 | } |
| 692 | |
| 693 | fd_h->proto_unread -= (uint64_t)sz; |
| 694 | |
| 695 | if (fd_h->proto_unread != 0) { |
| 696 | continue; // drain socket (and OpenSSL's internal buffer) dry |
| 697 | } |
| 698 | |
| 699 | if (fd_h->proto != NULL) { |
| 700 | return true; // done with entire request |
| 701 | } |
| 702 | // else - switch from header to body. |
| 703 | |
| 704 | // Check for a TLS ClientHello arriving at a non-TLS socket. Heuristic: |
| 705 | // - tls[0] == ContentType.handshake (22) |
| 706 | // - tls[1] == ProtocolVersion.major (3) |
| 707 | // - tls[5] == HandshakeType.client_hello (1) |
| 708 | |
| 709 | uint8_t* tls = (uint8_t*)&fd_h->proto_hdr; |
| 710 | |
| 711 | if (tls[0] == 22 && tls[1] == 3 && tls[5] == 1) { |
| 712 | cf_warning(AS_SERVICE, "ignoring TLS connection from %s" , |
| 713 | fd_h->client); |
| 714 | return false; |
| 715 | } |
| 716 | |
| 717 | // For backward compatibility, allow version 0 with security messages. |
| 718 | if (fd_h->proto_hdr.version != PROTO_VERSION && |
| 719 | ! (fd_h->proto_hdr.version == 0 && |
| 720 | fd_h->proto_hdr.type == PROTO_TYPE_SECURITY)) { |
| 721 | cf_warning(AS_SERVICE, "unsupported proto version %d from %s" , |
| 722 | fd_h->proto_hdr.version, fd_h->client); |
| 723 | return false; |
| 724 | } |
| 725 | |
| 726 | if (! as_proto_is_valid_type(&fd_h->proto_hdr)) { |
| 727 | cf_warning(AS_SERVICE, "unsupported proto type %d from %s" , |
| 728 | fd_h->proto_hdr.type, fd_h->client); |
| 729 | return false; |
| 730 | } |
| 731 | |
| 732 | as_proto_swap(&fd_h->proto_hdr); |
| 733 | |
| 734 | if (fd_h->proto_hdr.sz > PROTO_SIZE_MAX) { |
| 735 | cf_warning(AS_SERVICE, "invalid proto size %lu from %s" , |
| 736 | (uint64_t)fd_h->proto_hdr.sz, fd_h->client); |
| 737 | return false; |
| 738 | } |
| 739 | |
| 740 | fd_h->proto = cf_malloc(sizeof(as_proto) + fd_h->proto_hdr.sz); |
| 741 | memcpy(fd_h->proto, &fd_h->proto_hdr, sizeof(as_proto)); |
| 742 | |
| 743 | fd_h->proto_unread = fd_h->proto->sz; |
| 744 | end = fd_h->proto->body + fd_h->proto->sz; |
| 745 | } |
| 746 | } |
| 747 | |
| 748 | static void |
| 749 | start_transaction(as_file_handle* fd_h) |
| 750 | { |
| 751 | // as_end_of_transaction() rearms then decrements, so this may be > 1. |
| 752 | as_incr_uint32(&fd_h->in_transaction); |
| 753 | |
| 754 | uint64_t start_ns = fd_h->last_used; |
| 755 | as_proto* proto = fd_h->proto; |
| 756 | |
| 757 | fd_h->proto = NULL; |
| 758 | fd_h->proto_unread = sizeof(as_proto); |
| 759 | |
| 760 | if (proto->type == PROTO_TYPE_INFO) { |
| 761 | as_info_transaction it = { |
| 762 | .fd_h = fd_h, |
| 763 | .proto = proto, |
| 764 | .start_time = start_ns |
| 765 | }; |
| 766 | |
| 767 | as_info(&it); |
| 768 | return; |
| 769 | } |
| 770 | |
| 771 | as_transaction tr; |
| 772 | as_transaction_init_head(&tr, NULL, (cl_msg*)proto); |
| 773 | |
| 774 | tr.origin = FROM_CLIENT; |
| 775 | tr.from.proto_fd_h = fd_h; |
| 776 | tr.start_time = start_ns; |
| 777 | |
| 778 | if (proto->type == PROTO_TYPE_SECURITY) { |
| 779 | as_security_transact(&tr); |
| 780 | return; |
| 781 | } |
| 782 | |
| 783 | if (proto->type == PROTO_TYPE_AS_MSG_COMPRESSED) { |
| 784 | uint8_t* buf = NULL; |
| 785 | uint64_t buf_sz = 0; |
| 786 | |
| 787 | if (! decompress_msg((as_comp_proto*)proto, &buf, &buf_sz)) { |
| 788 | as_transaction_demarshal_error(&tr, AS_ERR_UNKNOWN); |
| 789 | return; |
| 790 | } |
| 791 | |
| 792 | cf_free(proto); |
| 793 | |
| 794 | proto = (as_proto*)buf; |
| 795 | tr.msgp = (cl_msg*)proto; |
| 796 | |
| 797 | as_proto_swap(proto); |
| 798 | |
| 799 | if (! as_proto_wrapped_is_valid(proto, buf_sz)) { |
| 800 | cf_warning(AS_SERVICE, "decompressed proto: (%d,%d,%lu,%lu)" , |
| 801 | proto->version, proto->type, (uint64_t)proto->sz, buf_sz); |
| 802 | as_transaction_demarshal_error(&tr, AS_ERR_UNKNOWN); |
| 803 | return; |
| 804 | } |
| 805 | } |
| 806 | |
| 807 | if (as_transaction_is_xdr(&tr) && ! fd_h->is_xdr) { |
| 808 | config_xdr_socket(&fd_h->sock); |
| 809 | fd_h->is_xdr = true; |
| 810 | } |
| 811 | |
| 812 | if (g_config.svc_benchmarks_enabled) { |
| 813 | tr.benchmark_time = histogram_insert_data_point( |
| 814 | g_stats.svc_demarshal_hist, start_ns); |
| 815 | } |
| 816 | |
| 817 | if (tr.msgp->msg.info1 & AS_MSG_INFO1_BATCH) { |
| 818 | as_batch_queue_task(&tr); |
| 819 | return; |
| 820 | } |
| 821 | |
| 822 | if (! as_transaction_prepare(&tr, true)) { |
| 823 | as_transaction_demarshal_error(&tr, AS_ERR_PARAMETER); |
| 824 | return; |
| 825 | } |
| 826 | |
| 827 | as_tsvc_process_transaction(&tr); |
| 828 | } |
| 829 | |
| 830 | static bool |
| 831 | decompress_msg(as_comp_proto* cproto, uint8_t** out_buf, uint64_t* out_buf_sz) |
| 832 | { |
| 833 | uint64_t orig_sz = cproto->orig_sz; |
| 834 | |
| 835 | // Hack to handle both little and big endian formats. Some clients wrongly |
| 836 | // send the size in little-endian format. If we interpret a legal big-endian |
| 837 | // size as little-endian, it will be > PROTO_SIZE_MAX. Use it as a clue. |
| 838 | if (orig_sz > PROTO_SIZE_MAX) { |
| 839 | orig_sz = cf_swap_from_be64(cproto->orig_sz); |
| 840 | |
| 841 | if (orig_sz > PROTO_SIZE_MAX) { |
| 842 | cf_warning(AS_SERVICE, "bad compressed packet size %lu" , orig_sz); |
| 843 | return false; |
| 844 | } |
| 845 | } |
| 846 | |
| 847 | uint8_t* decomp_buf = cf_malloc(orig_sz); |
| 848 | uint64_t decomp_buf_sz = orig_sz; |
| 849 | uint64_t comp_buf_sz = cproto->proto.sz - sizeof(cproto->orig_sz); |
| 850 | int rv = uncompress(decomp_buf, &decomp_buf_sz, cproto->data, comp_buf_sz); |
| 851 | |
| 852 | if (rv != Z_OK) { |
| 853 | cf_warning(AS_SERVICE, "zlib decompression failed with error %d" , rv); |
| 854 | cf_free(decomp_buf); |
| 855 | return false; |
| 856 | } |
| 857 | |
| 858 | if (orig_sz != decomp_buf_sz) { |
| 859 | cf_warning(AS_SERVICE, "decompressed size %lu is not expected size %lu" , |
| 860 | decomp_buf_sz, orig_sz); |
| 861 | cf_free(decomp_buf); |
| 862 | return false; |
| 863 | } |
| 864 | |
| 865 | *out_buf = decomp_buf; |
| 866 | *out_buf_sz = decomp_buf_sz; |
| 867 | |
| 868 | return true; |
| 869 | } |
| 870 | |
| 871 | static void |
| 872 | config_xdr_socket(cf_socket* sock) |
| 873 | { |
| 874 | cf_socket_set_receive_buffer(sock, XDR_READ_BUFFER_SIZE); |
| 875 | cf_socket_set_send_buffer(sock, XDR_WRITE_BUFFER_SIZE); |
| 876 | cf_socket_set_window(sock, XDR_READ_BUFFER_SIZE); |
| 877 | cf_socket_enable_nagle(sock); |
| 878 | } |
| 879 | |
| 880 | |
| 881 | //========================================================== |
| 882 | // Local helpers - reap idle and bad connections. |
| 883 | // |
| 884 | |
| 885 | static void |
| 886 | start_reaper(void) |
| 887 | { |
| 888 | struct rlimit rl; |
| 889 | |
| 890 | if (getrlimit(RLIMIT_NOFILE, &rl) < 0) { |
| 891 | cf_crash(AS_SERVICE, "getrlimit() failed: %s" , cf_strerror(errno)); |
| 892 | } |
| 893 | |
| 894 | g_n_slots = (uint32_t)rl.rlim_cur; |
| 895 | g_file_handles = cf_calloc(g_n_slots, sizeof(as_file_handle*)); |
| 896 | |
| 897 | cf_queue_init(&g_free_slots, sizeof(uint32_t), g_n_slots, false); |
| 898 | |
| 899 | for (uint32_t i = 0; i < g_n_slots; i++) { |
| 900 | cf_queue_push(&g_free_slots, &i); |
| 901 | } |
| 902 | |
| 903 | cf_info(AS_SERVICE, "starting reaper thread" ); |
| 904 | |
| 905 | cf_thread_create_detached(run_reaper, NULL); |
| 906 | } |
| 907 | |
| 908 | static void* |
| 909 | run_reaper(void* udata) |
| 910 | { |
| 911 | (void)udata; |
| 912 | |
| 913 | while (true) { |
| 914 | sleep(1); |
| 915 | |
| 916 | bool security_refresh = as_security_should_refresh(); |
| 917 | |
| 918 | uint64_t kill_ns = (uint64_t)g_config.proto_fd_idle_ms * 1000000; |
| 919 | uint64_t now_ns = cf_getns(); |
| 920 | |
| 921 | cf_mutex_lock(&g_reaper_lock); |
| 922 | |
| 923 | uint32_t n_remaining = g_n_slots - (uint32_t)cf_queue_sz(&g_free_slots); |
| 924 | |
| 925 | for (uint32_t i = 0; n_remaining != 0; i++) { |
| 926 | as_file_handle* fd_h = g_file_handles[i]; |
| 927 | |
| 928 | if (fd_h == NULL) { |
| 929 | continue; |
| 930 | } |
| 931 | |
| 932 | n_remaining--; |
| 933 | |
| 934 | if (security_refresh) { |
| 935 | as_security_refresh(fd_h); |
| 936 | } |
| 937 | |
| 938 | // reap_me overrides do_not_reap. |
| 939 | if (fd_h->reap_me) { |
| 940 | g_file_handles[i] = NULL; |
| 941 | cf_queue_push_head(&g_free_slots, &i); |
| 942 | as_release_file_handle(fd_h); |
| 943 | continue; |
| 944 | } |
| 945 | |
| 946 | if (fd_h->in_transaction != 0) { |
| 947 | continue; |
| 948 | } |
| 949 | |
| 950 | if (kill_ns != 0 && fd_h->last_used + kill_ns < now_ns) { |
| 951 | cf_socket_shutdown(&fd_h->sock); // will trigger epoll errors |
| 952 | |
| 953 | g_file_handles[i] = NULL; |
| 954 | cf_queue_push_head(&g_free_slots, &i); |
| 955 | as_release_file_handle(fd_h); |
| 956 | |
| 957 | g_stats.reaper_count++; |
| 958 | } |
| 959 | } |
| 960 | |
| 961 | cf_mutex_unlock(&g_reaper_lock); |
| 962 | } |
| 963 | |
| 964 | return NULL; |
| 965 | } |
| 966 | |
| 967 | |
| 968 | //========================================================== |
| 969 | // Local helpers - transaction queue. |
| 970 | // |
| 971 | |
| 972 | static bool |
| 973 | start_internal_transaction(thread_ctx* ctx) |
| 974 | { |
| 975 | as_transaction tr; |
| 976 | |
| 977 | cf_mutex_lock(ctx->lock); |
| 978 | |
| 979 | if (! cf_epoll_queue_pop(&ctx->trans_q, &tr)) { |
| 980 | cf_crash(AS_SERVICE, "unable to pop from transaction queue" ); |
| 981 | } |
| 982 | |
| 983 | cf_mutex_unlock(ctx->lock); |
| 984 | |
| 985 | if (tr.msgp == NULL) { |
| 986 | return false; |
| 987 | } |
| 988 | |
| 989 | if (g_config.svc_benchmarks_enabled && |
| 990 | tr.benchmark_time != 0 && ! as_transaction_is_restart(&tr)) { |
| 991 | histogram_insert_data_point(g_stats.svc_queue_hist, tr.benchmark_time); |
| 992 | } |
| 993 | |
| 994 | as_tsvc_process_transaction(&tr); |
| 995 | |
| 996 | return true; |
| 997 | } |
| 998 | |