1/*
2 * fabric.c
3 *
4 * Copyright (C) 2008-2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23// Object Management:
24// ------------------
25//
26// Node and FC objects are reference counted. Correct book keeping on object
27// references are vital to system operations.
28//
29// Holders of FC references:
30// (1) node->fc_hash
31// (2) node->send_idle_fc_queue
32// (3) (epoll_event ev).data.ptr
33//
34// For sending, (2) and (3) are mutually exclusive.
35// Refs between (2) and (3) are passed virtually whenever possible, without
36// needing to explicitly call reserve/release.
37// (3) takes ref on rearm.
38// (3) gives ref to calling thread when epoll triggers, due to ONESHOT.
39// Thread will either rearm or give ref to (2). Never do both.
40//
41// FCs are created in two methods: fabric_node_connect(), run_fabric_accept()
42//
43// Holders of Node references:
44// * fc->node
45// * g_fabric.node_hash
46
47
48//==========================================================
49// Includes.
50//
51
52#include "fabric/fabric.h"
53
54#include <errno.h>
55#include <stdbool.h>
56#include <stddef.h>
57#include <stdint.h>
58#include <string.h>
59#include <unistd.h>
60
61#include "citrusleaf/alloc.h"
62#include "citrusleaf/cf_atomic.h"
63#include "citrusleaf/cf_byte_order.h"
64#include "citrusleaf/cf_clock.h"
65#include "citrusleaf/cf_queue.h"
66#include "citrusleaf/cf_vector.h"
67
68#include "cf_mutex.h"
69#include "cf_thread.h"
70#include "fault.h"
71#include "msg.h"
72#include "node.h"
73#include "rchash.h"
74#include "shash.h"
75#include "socket.h"
76#include "tls.h"
77
78#include "base/cfg.h"
79#include "base/health.h"
80#include "base/stats.h"
81#include "fabric/endpoint.h"
82#include "fabric/hb.h"
83
84
85//==========================================================
86// Typedefs & constants.
87//
88
89#define FABRIC_SEND_MEM_SZ (1024) // bytes
90#define FABRIC_BUFFER_MEM_SZ (1024 * 1024) // bytes
91#define FABRIC_BUFFER_MAX_SZ (128 * 1024 * 1024) // used simply for validation
92#define FABRIC_EPOLL_SEND_EVENTS 16
93#define FABRIC_EPOLL_RECV_EVENTS 1
94
95typedef enum {
96 // These values go on the wire, so mind backward compatibility if changing.
97 FS_FIELD_NODE,
98 FS_UNUSED1, // used to be FS_ADDR
99 FS_UNUSED2, // used to be FS_PORT
100 FS_UNUSED3, // used to be FS_ANV
101 FS_UNUSED4, // used to be FS_ADDR_EX
102 FS_CHANNEL,
103
104 NUM_FS_FIELDS
105} fs_msg_fields;
106
107static const msg_template fabric_mt[] = {
108 { FS_FIELD_NODE, M_FT_UINT64 },
109 { FS_UNUSED1, M_FT_UINT32 },
110 { FS_UNUSED2, M_FT_UINT32 },
111 { FS_UNUSED3, M_FT_BUF },
112 { FS_UNUSED4, M_FT_BUF },
113 { FS_CHANNEL, M_FT_UINT32 },
114};
115
116COMPILER_ASSERT(sizeof(fabric_mt) / sizeof(msg_template) == NUM_FS_FIELDS);
117
118#define FS_MSG_SCRATCH_SIZE 0
119
120#define DEFAULT_EVENTS (EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLONESHOT)
121
122// Block size for allocating fabric hb plugin data.
123#define HB_PLUGIN_DATA_BLOCK_SIZE 128
124
125typedef struct fabric_recv_thread_pool_s {
126 cf_vector threads;
127 cf_poll poll;
128 uint32_t pool_id;
129} fabric_recv_thread_pool;
130
131typedef struct send_entry_s {
132 struct send_entry_s *next;
133 uint32_t id;
134 uint32_t count;
135 cf_poll poll;
136} send_entry;
137
138typedef struct fabric_state_s {
139 as_fabric_msg_fn msg_cb[M_TYPE_MAX];
140 void *msg_udata[M_TYPE_MAX];
141
142 fabric_recv_thread_pool recv_pool[AS_FABRIC_N_CHANNELS];
143
144 cf_mutex send_lock;
145 send_entry *sends;
146 send_entry *send_head;
147
148 cf_mutex node_hash_lock;
149 cf_rchash *node_hash; // key is cf_node, value is (fabric_node *)
150} fabric_state;
151
152typedef struct fabric_node_s {
153 cf_node node_id; // remote node
154 bool live; // set to false on shutdown
155 uint32_t connect_count[AS_FABRIC_N_CHANNELS];
156 bool connect_full;
157
158 cf_mutex connect_lock;
159
160 cf_mutex fc_hash_lock;
161 cf_shash *fc_hash; // key is (fabric_connection *), value unused
162
163 cf_mutex send_idle_fc_queue_lock;
164 cf_queue send_idle_fc_queue[AS_FABRIC_N_CHANNELS];
165
166 cf_queue send_queue[AS_FABRIC_N_CHANNELS];
167
168 uint8_t send_counts[];
169} fabric_node;
170
171typedef struct fabric_connection_s {
172 cf_socket sock;
173 cf_sock_addr peer;
174 fabric_node *node;
175
176 bool failed;
177 bool started_via_connect;
178
179 bool s_cork_bypass;
180 int s_cork;
181 uint8_t s_buf[FABRIC_SEND_MEM_SZ];
182 struct iovec *s_iov;
183 size_t s_iov_count;
184 uint32_t s_sz;
185 uint32_t s_msg_sz;
186 msg *s_msg_in_progress;
187 size_t s_count;
188
189 uint8_t *r_bigbuf;
190 uint8_t r_buf[FABRIC_BUFFER_MEM_SZ + sizeof(msg_hdr)];
191 uint32_t r_rearm_count;
192 msg_type r_type;
193 uint32_t r_sz;
194 uint32_t r_buf_sz;
195 uint64_t benchmark_time;
196
197 // The send_ptr != NULL means that the FC's sock has registered with
198 // send_poll. This is needed because epoll's API doesn't allow registering
199 // a socket without event triggers (ERR and HUP are enabled even when
200 // unspecified).
201 send_entry *send_ptr;
202 fabric_recv_thread_pool *pool;
203
204 uint64_t s_bytes;
205 uint64_t s_bytes_last;
206 uint64_t r_bytes;
207 uint64_t r_bytes_last;
208} fabric_connection;
209
210typedef struct node_list_s {
211 uint32_t count;
212 cf_node nodes[AS_CLUSTER_SZ]; // must support the maximum cluster size.
213} node_list;
214
215const char *CHANNEL_NAMES[] = {
216 [AS_FABRIC_CHANNEL_RW] = "rw",
217 [AS_FABRIC_CHANNEL_CTRL] = "ctrl",
218 [AS_FABRIC_CHANNEL_BULK] = "bulk",
219 [AS_FABRIC_CHANNEL_META] = "meta",
220};
221
222COMPILER_ASSERT(sizeof(CHANNEL_NAMES) / sizeof(const char *) ==
223 AS_FABRIC_N_CHANNELS);
224
225const bool channel_nagle[] = {
226 [AS_FABRIC_CHANNEL_RW] = false,
227 [AS_FABRIC_CHANNEL_CTRL] = false,
228 [AS_FABRIC_CHANNEL_BULK] = true,
229 [AS_FABRIC_CHANNEL_META] = false,
230};
231
232COMPILER_ASSERT(sizeof(channel_nagle) / sizeof(bool) == AS_FABRIC_N_CHANNELS);
233
234
235//==========================================================
236// Globals.
237//
238
239cf_serv_cfg g_fabric_bind = { .n_cfgs = 0 };
240cf_tls_info *g_fabric_tls;
241
242static fabric_state g_fabric;
243static cf_poll g_accept_poll;
244
245static as_endpoint_list *g_published_endpoint_list;
246static bool g_published_endpoint_list_ipv4_only;
247
248// Max connections formed via connect. Others are formed via accept.
249static uint32_t g_fabric_connect_limit[AS_FABRIC_N_CHANNELS];
250
251
252//==========================================================
253// Forward declarations.
254//
255
256// Support functions.
257static void send_entry_insert(send_entry **se_pp, send_entry *se);
258
259static void fabric_published_serv_cfg_fill(const cf_serv_cfg *bind_cfg, cf_serv_cfg *published_cfg, bool ipv4_only);
260static bool fabric_published_endpoints_refresh(void);
261
262// fabric_node
263static fabric_node *fabric_node_create(cf_node node_id);
264static fabric_node *fabric_node_get(cf_node node_id);
265static fabric_node *fabric_node_get_or_create(cf_node node_id);
266static fabric_node *fabric_node_pop(cf_node node_id);
267static int fabric_node_disconnect_reduce_fn(const void *key, void *data, void *udata);
268static void fabric_node_disconnect(cf_node node_id);
269
270static fabric_connection *fabric_node_connect(fabric_node *node, uint32_t ch);
271static int fabric_node_send(fabric_node *node, msg *m, as_fabric_channel channel);
272static void fabric_node_connect_all(fabric_node *node);
273static void fabric_node_destructor(void *pnode);
274inline static void fabric_node_reserve(fabric_node *node);
275inline static void fabric_node_release(fabric_node *node);
276static bool fabric_node_add_connection(fabric_node *node, fabric_connection *fc);
277static uint8_t fabric_node_find_min_send_count(const fabric_node *node);
278static bool fabric_node_is_connect_full(const fabric_node *node);
279
280static int fabric_get_node_list_fn(const void *key, uint32_t keylen, void *data, void *udata);
281static uint32_t fabric_get_node_list(node_list *nl);
282
283// fabric_connection
284fabric_connection *fabric_connection_create(cf_socket *sock, cf_sock_addr *peer);
285static bool fabric_connection_accept_tls(fabric_connection *fc);
286static bool fabric_connection_connect_tls(fabric_connection *fc);
287inline static void fabric_connection_reserve(fabric_connection *fc);
288static void fabric_connection_release(fabric_connection *fc);
289inline static cf_node fabric_connection_get_id(const fabric_connection *fc);
290
291inline static void fabric_connection_cork(fabric_connection *fc);
292inline static void fabric_connection_uncork(fabric_connection *fc);
293static void fabric_connection_send_assign(fabric_connection *fc);
294static void fabric_connection_send_unassign(fabric_connection *fc);
295inline static void fabric_connection_recv_rearm(fabric_connection *fc);
296inline static void fabric_connection_send_rearm(fabric_connection *fc);
297static void fabric_connection_disconnect(fabric_connection *fc);
298static void fabric_connection_set_keepalive_options(fabric_connection *fc);
299
300static void fabric_connection_reroute_msg(fabric_connection *fc);
301static bool fabric_connection_send_progress(fabric_connection *fc);
302static bool fabric_connection_process_writable(fabric_connection *fc);
303
304static bool fabric_connection_process_fabric_msg(fabric_connection *fc, const msg *m);
305static bool fabric_connection_read_fabric_msg(fabric_connection *fc);
306
307static bool fabric_connection_process_msg(fabric_connection *fc, bool do_rearm);
308static bool fabric_connection_process_readable(fabric_connection *fc);
309
310// fabric_recv_thread_pool
311static void fabric_recv_thread_pool_init(fabric_recv_thread_pool *pool, uint32_t size, uint32_t pool_id);
312static void fabric_recv_thread_pool_set_size(fabric_recv_thread_pool *pool, uint32_t size);
313static void fabric_recv_thread_pool_add_fc(fabric_recv_thread_pool *pool, fabric_connection *fc);
314
315// fabric_endpoint
316static bool fabric_endpoint_list_get(cf_node nodeid, as_endpoint_list *endpoint_list, size_t *endpoint_list_size);
317static bool fabric_connect_endpoint_filter(const as_endpoint *endpoint, void *udata);
318
319// Thread functions.
320static void *run_fabric_recv(void *arg);
321static void *run_fabric_send(void *arg);
322static void *run_fabric_accept(void *arg);
323
324// Ticker helpers.
325static int fabric_rate_node_reduce_fn(const void *key, uint32_t keylen, void *data, void *udata);
326static int fabric_rate_fc_reduce_fn(const void *key, void *data, void *udata);
327
328// Heartbeat.
329static void fabric_hb_plugin_set_fn(msg *m);
330static void fabric_hb_plugin_parse_data_fn(msg *m, cf_node source, as_hb_plugin_node_data *prev_plugin_data, as_hb_plugin_node_data *plugin_data);
331static void fabric_heartbeat_event(int nevents, as_hb_event_node *events, void *udata);
332
333
334//==========================================================
335// Public API.
336//
337
338//------------------------------------------------
339// msg
340//
341
342// Log information about existing "msg" objects and queues.
343void
344as_fabric_msg_queue_dump()
345{
346 cf_info(AS_FABRIC, "All currently-existing msg types:");
347
348 int total_q_sz = 0;
349 int total_alloced_msgs = 0;
350
351 for (int i = 0; i < M_TYPE_MAX; i++) {
352 int num_of_type = cf_atomic_int_get(g_num_msgs_by_type[i]);
353
354 total_alloced_msgs += num_of_type;
355
356 if (num_of_type) {
357 cf_info(AS_FABRIC, "alloc'd = %d", num_of_type);
358 }
359 }
360
361 int num_msgs = cf_atomic_int_get(g_num_msgs);
362
363 if (abs(num_msgs - total_alloced_msgs) > 2) {
364 cf_warning(AS_FABRIC, "num msgs (%d) != total alloc'd msgs (%d)", num_msgs, total_alloced_msgs);
365 }
366
367 cf_info(AS_FABRIC, "Total num. msgs = %d ; Total num. queued = %d ; Delta = %d", num_msgs, total_q_sz, num_msgs - total_q_sz);
368}
369
370//------------------------------------------------
371// as_fabric
372//
373
374void
375as_fabric_init()
376{
377 for (uint32_t i = 0; i < AS_FABRIC_N_CHANNELS; i++) {
378 g_fabric_connect_limit[i] = g_config.n_fabric_channel_fds[i];
379
380 fabric_recv_thread_pool_init(&g_fabric.recv_pool[i],
381 g_config.n_fabric_channel_recv_threads[i], i);
382 }
383
384 cf_mutex_init(&g_fabric.send_lock);
385
386 as_fabric_register_msg_fn(M_TYPE_FABRIC, fabric_mt, sizeof(fabric_mt),
387 FS_MSG_SCRATCH_SIZE, NULL, NULL);
388
389 cf_mutex_init(&g_fabric.node_hash_lock);
390
391 g_fabric.node_hash = cf_rchash_create(cf_nodeid_rchash_fn,
392 fabric_node_destructor, sizeof(cf_node), 128, 0);
393
394 g_published_endpoint_list = NULL;
395 g_published_endpoint_list_ipv4_only = cf_ip_addr_legacy_only();
396
397 if (! fabric_published_endpoints_refresh()) {
398 cf_crash(AS_FABRIC, "error creating fabric published endpoint list");
399 }
400
401 as_hb_plugin fabric_plugin;
402
403 memset(&fabric_plugin, 0, sizeof(fabric_plugin));
404 fabric_plugin.id = AS_HB_PLUGIN_FABRIC;
405 fabric_plugin.wire_size_fixed = 0; // includes the size for the protocol version
406 as_endpoint_list_sizeof(g_published_endpoint_list,
407 &fabric_plugin.wire_size_fixed);
408 fabric_plugin.wire_size_per_node = 0; // size per node node in succession list
409 fabric_plugin.set_fn = fabric_hb_plugin_set_fn;
410 fabric_plugin.parse_fn = fabric_hb_plugin_parse_data_fn;
411 fabric_plugin.change_listener = NULL;
412 as_hb_plugin_register(&fabric_plugin);
413
414 as_hb_register_listener(fabric_heartbeat_event, &g_fabric);
415}
416
417void
418as_fabric_start()
419{
420 g_fabric.sends =
421 cf_malloc(sizeof(send_entry) * g_config.n_fabric_send_threads);
422 g_fabric.send_head = g_fabric.sends;
423
424 cf_info(AS_FABRIC, "starting %u fabric send threads", g_config.n_fabric_send_threads);
425
426 for (int i = 0; i < g_config.n_fabric_send_threads; i++) {
427 cf_poll_create(&g_fabric.sends[i].poll);
428 g_fabric.sends[i].id = i;
429 g_fabric.sends[i].count = 0;
430 g_fabric.sends[i].next = g_fabric.sends + i + 1;
431
432 cf_thread_create_detached(run_fabric_send, (void*)&g_fabric.sends[i]);
433 }
434
435 g_fabric.sends[g_config.n_fabric_send_threads - 1].next = NULL;
436
437 for (uint32_t i = 0; i < AS_FABRIC_N_CHANNELS; i++) {
438 cf_info(AS_FABRIC, "starting %u fabric %s channel recv threads", g_config.n_fabric_channel_recv_threads[i], CHANNEL_NAMES[i]);
439
440 fabric_recv_thread_pool_set_size(&g_fabric.recv_pool[i],
441 g_config.n_fabric_channel_recv_threads[i]);
442 }
443
444 cf_info(AS_FABRIC, "starting fabric accept thread");
445
446 cf_thread_create_detached(run_fabric_accept, NULL);
447}
448
449void
450as_fabric_set_recv_threads(as_fabric_channel channel, uint32_t count)
451{
452 g_config.n_fabric_channel_recv_threads[channel] = count;
453
454 fabric_recv_thread_pool_set_size(&g_fabric.recv_pool[channel], count);
455}
456
457int
458as_fabric_send(cf_node node_id, msg *m, as_fabric_channel channel)
459{
460 m->benchmark_time = g_config.fabric_benchmarks_enabled ? cf_getns() : 0;
461
462 if (g_config.self_node == node_id) {
463 cf_assert(g_fabric.msg_cb[m->type], AS_FABRIC, "m->type %d not registered", m->type);
464 (g_fabric.msg_cb[m->type])(node_id, m, g_fabric.msg_udata[m->type]);
465
466 return AS_FABRIC_SUCCESS;
467 }
468
469 fabric_node *node = fabric_node_get(node_id);
470 int ret = fabric_node_send(node, m, channel);
471
472 if (node) {
473 fabric_node_release(node); // from fabric_node_get
474 }
475
476 return ret;
477}
478
479int
480as_fabric_send_list(const cf_node *nodes, uint32_t node_count, msg *m,
481 as_fabric_channel channel)
482{
483 cf_assert(nodes && node_count != 0, AS_FABRIC, "nodes list null or empty");
484
485 // TODO - if we implement an out-of-scope response when sending to self,
486 // remove this deferral.
487 bool send_self = false;
488
489 for (uint32_t i = 0; i < node_count; i++) {
490 if (nodes[i] == g_config.self_node) {
491 send_self = true;
492 continue;
493 }
494
495 msg_incr_ref(m);
496
497 int ret = as_fabric_send(nodes[i], m, channel);
498
499 if (ret != AS_FABRIC_SUCCESS) {
500 as_fabric_msg_put(m);
501 return ret; // caller releases main reference on failure
502 }
503 }
504
505 if (send_self) {
506 // Shortcut - use main reference for fabric.
507 return as_fabric_send(g_config.self_node, m, channel);
508 }
509
510 as_fabric_msg_put(m); // release main reference
511
512 return AS_FABRIC_SUCCESS;
513}
514
515int
516as_fabric_retransmit(cf_node node_id, msg *m, as_fabric_channel channel)
517{
518 // This function assumes the sender holds only a single reference to the
519 // msg. Do not use this function when there may be more than one reference
520 // to an unsent msg.
521
522 if (cf_rc_count(m) > 1) {
523 // Msg should already be in the fabric queue - success.
524 return AS_FABRIC_SUCCESS;
525 }
526
527 msg_incr_ref(m);
528
529 int err = as_fabric_send(node_id, m, channel);
530
531 if (err != AS_FABRIC_SUCCESS) {
532 as_fabric_msg_put(m);
533 return err;
534 }
535
536 return AS_FABRIC_SUCCESS;
537}
538
539// TODO - make static registration
540void
541as_fabric_register_msg_fn(msg_type type, const msg_template *mt, size_t mt_sz,
542 size_t scratch_sz, as_fabric_msg_fn msg_cb, void *msg_udata)
543{
544 msg_type_register(type, mt, mt_sz, scratch_sz);
545
546 g_fabric.msg_cb[type] = msg_cb;
547 g_fabric.msg_udata[type] = msg_udata;
548}
549
550void
551as_fabric_info_peer_endpoints_get(cf_dyn_buf *db)
552{
553 node_list nl;
554 fabric_get_node_list(&nl);
555
556 for (uint32_t i = 0; i < nl.count; i++) {
557 if (nl.nodes[i] == g_config.self_node) {
558 continue;
559 }
560
561 fabric_node *node = fabric_node_get(nl.nodes[i]);
562
563 if (! node) {
564 cf_info(AS_FABRIC, "\tnode %lx not found in hash although reported available", nl.nodes[i]);
565 continue;
566 }
567
568 size_t endpoint_list_capacity = 1024;
569 bool retry = true;
570
571 while (true) {
572 uint8_t stack_mem[endpoint_list_capacity];
573 as_endpoint_list *endpoint_list = (as_endpoint_list *)stack_mem;
574
575 if (! fabric_endpoint_list_get(node->node_id, endpoint_list,
576 &endpoint_list_capacity)) {
577 if (errno == ENOENT) {
578 // No entry present for this node in heartbeat.
579 cf_detail(AS_FABRIC, "could not get endpoint list for %lx", node->node_id);
580 break;
581 }
582
583 if (! retry) {
584 break;
585 }
586
587 retry = false;
588 continue;
589 }
590
591 cf_dyn_buf_append_string(db, "fabric.peer=");
592 cf_dyn_buf_append_string(db, "node-id=");
593 cf_dyn_buf_append_uint64_x(db, node->node_id);
594 cf_dyn_buf_append_string(db, ":");
595 as_endpoint_list_info(endpoint_list, db);
596 cf_dyn_buf_append_string(db, ";");
597 break;
598 }
599
600 fabric_node_release(node);
601 }
602}
603
604bool
605as_fabric_is_published_endpoint_list(const as_endpoint_list *list)
606{
607 return as_endpoint_lists_are_equal(g_published_endpoint_list, list);
608}
609
610// Used by heartbeat subsystem only, for duplicate node-id detection.
611as_endpoint_list *
612as_fabric_hb_plugin_get_endpoint_list(as_hb_plugin_node_data *plugin_data)
613{
614 return (plugin_data && plugin_data->data_size != 0) ?
615 (as_endpoint_list *)plugin_data->data : NULL;
616}
617
618void
619as_fabric_rate_capture(fabric_rate *rate)
620{
621 cf_mutex_lock(&g_fabric.node_hash_lock);
622 cf_rchash_reduce(g_fabric.node_hash, fabric_rate_node_reduce_fn, rate);
623 cf_mutex_unlock(&g_fabric.node_hash_lock);
624}
625
626void
627as_fabric_dump(bool verbose)
628{
629 node_list nl;
630 fabric_get_node_list(&nl);
631
632 cf_info(AS_FABRIC, " Fabric Dump: nodes known %d", nl.count);
633
634 for (uint32_t i = 0; i < nl.count; i++) {
635 if (nl.nodes[i] == g_config.self_node) {
636 cf_info(AS_FABRIC, "\tnode %lx is self", nl.nodes[i]);
637 continue;
638 }
639
640 fabric_node *node = fabric_node_get(nl.nodes[i]);
641
642 if (! node) {
643 cf_info(AS_FABRIC, "\tnode %lx not found in hash although reported available", nl.nodes[i]);
644 continue;
645 }
646
647 cf_mutex_lock(&node->fc_hash_lock);
648 cf_info(AS_FABRIC, "\tnode %lx fds {via_connect={h=%d m=%d l=%d} all=%d} live %d q {h=%d m=%d l=%d}",
649 node->node_id,
650 node->connect_count[AS_FABRIC_CHANNEL_CTRL],
651 node->connect_count[AS_FABRIC_CHANNEL_RW],
652 node->connect_count[AS_FABRIC_CHANNEL_BULK],
653 cf_shash_get_size(node->fc_hash), node->live,
654 cf_queue_sz(&node->send_queue[AS_FABRIC_CHANNEL_CTRL]),
655 cf_queue_sz(&node->send_queue[AS_FABRIC_CHANNEL_RW]),
656 cf_queue_sz(&node->send_queue[AS_FABRIC_CHANNEL_BULK]));
657 cf_mutex_unlock(&node->fc_hash_lock);
658
659 fabric_node_release(node); // node_get
660 }
661}
662
663
664//==========================================================
665// Support functions.
666//
667
668static void
669send_entry_insert(send_entry **se_pp, send_entry *se)
670{
671 while (*se_pp && se->count > (*se_pp)->count) {
672 se_pp = &(*se_pp)->next;
673 }
674
675 se->next = *se_pp;
676 *se_pp = se;
677}
678
679// Get addresses to publish as serv config. Expand "any" addresses.
680static void
681fabric_published_serv_cfg_fill(const cf_serv_cfg *bind_cfg,
682 cf_serv_cfg *published_cfg, bool ipv4_only)
683{
684 cf_serv_cfg_init(published_cfg);
685
686 cf_sock_cfg sock_cfg;
687
688 for (int i = 0; i < bind_cfg->n_cfgs; i++) {
689 cf_sock_cfg_copy(&bind_cfg->cfgs[i], &sock_cfg);
690
691 // Expand "any" address to all interfaces.
692 if (cf_ip_addr_is_any(&sock_cfg.addr)) {
693 cf_ip_addr all_addrs[CF_SOCK_CFG_MAX];
694 uint32_t n_all_addrs = CF_SOCK_CFG_MAX;
695
696 if (cf_inter_get_addr_all(all_addrs, &n_all_addrs) != 0) {
697 cf_warning(AS_FABRIC, "error getting all interface addresses");
698 n_all_addrs = 0;
699 }
700
701 for (int j = 0; j < n_all_addrs; j++) {
702 // Skip local address if any is specified.
703 if (cf_ip_addr_is_local(&all_addrs[j]) ||
704 (ipv4_only && ! cf_ip_addr_is_legacy(&all_addrs[j]))) {
705 continue;
706 }
707
708 cf_ip_addr_copy(&all_addrs[j], &sock_cfg.addr);
709
710 if (cf_serv_cfg_add_sock_cfg(published_cfg, &sock_cfg)) {
711 cf_crash(AS_FABRIC, "error initializing published address list");
712 }
713 }
714 }
715 else {
716 if (ipv4_only && ! cf_ip_addr_is_legacy(&bind_cfg->cfgs[i].addr)) {
717 continue;
718 }
719
720 if (cf_serv_cfg_add_sock_cfg(published_cfg, &sock_cfg)) {
721 cf_crash(AS_FABRIC, "error initializing published address list");
722 }
723 }
724 }
725}
726
727// Refresh the fabric published endpoint list.
728// Return true on success.
729static bool
730fabric_published_endpoints_refresh()
731{
732 if (g_published_endpoint_list &&
733 g_published_endpoint_list_ipv4_only == cf_ip_addr_legacy_only()) {
734 return true;
735 }
736
737 // The global flag has changed, refresh the published address list.
738 if (g_published_endpoint_list) {
739 // Free the obsolete list.
740 cf_free(g_published_endpoint_list);
741 }
742
743 cf_serv_cfg published_cfg;
744 fabric_published_serv_cfg_fill(&g_fabric_bind, &published_cfg,
745 g_published_endpoint_list_ipv4_only);
746
747 g_published_endpoint_list = as_endpoint_list_from_serv_cfg(&published_cfg);
748 cf_assert(g_published_endpoint_list, AS_FABRIC, "error initializing mesh published address list");
749
750 g_published_endpoint_list_ipv4_only = cf_ip_addr_legacy_only();
751
752 if (g_published_endpoint_list->n_endpoints == 0) {
753 if (g_published_endpoint_list_ipv4_only) {
754 cf_warning(AS_FABRIC, "no IPv4 addresses configured for fabric");
755 }
756 else {
757 cf_warning(AS_FABRIC, "no addresses configured for fabric");
758 }
759
760 return false;
761 }
762
763 char endpoint_list_str[512];
764 as_endpoint_list_to_string(g_published_endpoint_list, endpoint_list_str,
765 sizeof(endpoint_list_str));
766
767 cf_info(AS_FABRIC, "updated fabric published address list to {%s}", endpoint_list_str);
768
769 return true;
770}
771
772
773//==========================================================
774// fabric_node
775//
776
777static fabric_node *
778fabric_node_create(cf_node node_id)
779{
780 size_t size = sizeof(fabric_node) +
781 (sizeof(uint8_t) * g_config.n_fabric_send_threads);
782 fabric_node *node = cf_rc_alloc(size);
783
784 memset(node, 0, size);
785
786 node->node_id = node_id;
787 node->live = true;
788
789 cf_mutex_init(&node->send_idle_fc_queue_lock);
790
791 for (int i = 0; i < AS_FABRIC_N_CHANNELS; i++) {
792 cf_queue_init(&node->send_idle_fc_queue[i], sizeof(fabric_connection *),
793 CF_QUEUE_ALLOCSZ, false);
794
795 cf_queue_init(&node->send_queue[i], sizeof(msg *), CF_QUEUE_ALLOCSZ,
796 true);
797 }
798
799 cf_mutex_init(&node->connect_lock);
800 cf_mutex_init(&node->fc_hash_lock);
801
802 node->fc_hash = cf_shash_create(cf_shash_fn_ptr,
803 sizeof(fabric_connection *), 0, 32, 0);
804
805 cf_detail(AS_FABRIC, "fabric_node_create(%lx) node %p", node_id, node);
806
807 return node;
808}
809
810static fabric_node *
811fabric_node_get(cf_node node_id)
812{
813 fabric_node *node;
814
815 cf_mutex_lock(&g_fabric.node_hash_lock);
816 int rv = cf_rchash_get(g_fabric.node_hash, &node_id, sizeof(cf_node),
817 (void **)&node);
818 cf_mutex_unlock(&g_fabric.node_hash_lock);
819
820 if (rv != CF_RCHASH_OK) {
821 return NULL;
822 }
823
824 return node;
825}
826
827static fabric_node *
828fabric_node_get_or_create(cf_node node_id)
829{
830 fabric_node *node;
831
832 cf_mutex_lock(&g_fabric.node_hash_lock);
833
834 if (cf_rchash_get(g_fabric.node_hash, &node_id, sizeof(cf_node),
835 (void **)&node) == CF_RCHASH_OK) {
836 cf_mutex_unlock(&g_fabric.node_hash_lock);
837
838 fabric_node_connect_all(node);
839
840 return node;
841 }
842
843 node = fabric_node_create(node_id);
844
845 if (cf_rchash_put_unique(g_fabric.node_hash, &node_id, sizeof(cf_node),
846 node) != CF_RCHASH_OK) {
847 cf_crash(AS_FABRIC, "fabric_node_get_or_create(%lx)", node_id);
848 }
849
850 fabric_node_reserve(node); // for return
851
852 cf_mutex_unlock(&g_fabric.node_hash_lock);
853
854 fabric_node_connect_all(node);
855
856 return node;
857}
858
859static fabric_node *
860fabric_node_pop(cf_node node_id)
861{
862 fabric_node *node = NULL;
863
864 cf_mutex_lock(&g_fabric.node_hash_lock);
865
866 if (cf_rchash_get(g_fabric.node_hash, &node_id, sizeof(cf_node),
867 (void **)&node) == CF_RCHASH_OK) {
868 if (cf_rchash_delete(g_fabric.node_hash, &node_id, sizeof(node_id)) !=
869 CF_RCHASH_OK) {
870 cf_crash(AS_FABRIC, "fabric_node_pop(%lx)", node_id);
871 }
872 }
873
874 cf_mutex_unlock(&g_fabric.node_hash_lock);
875
876 return node;
877}
878
879static int
880fabric_node_disconnect_reduce_fn(const void *key, void *data, void *udata)
881{
882 fabric_connection *fc = *(fabric_connection **)key;
883
884 cf_assert(fc, AS_FABRIC, "fc == NULL, don't put NULLs into fc_hash");
885 cf_socket_shutdown(&fc->sock);
886 fabric_connection_release(fc); // for delete from node->fc_hash
887
888 return CF_SHASH_REDUCE_DELETE;
889}
890
891static void
892fabric_node_disconnect(cf_node node_id)
893{
894 fabric_node *node = fabric_node_pop(node_id);
895
896 if (! node) {
897 cf_warning(AS_FABRIC, "fabric_node_disconnect(%lx) not connected", node_id);
898 return;
899 }
900
901 cf_info(AS_FABRIC, "fabric_node_disconnect(%lx)", node_id);
902
903 cf_mutex_lock(&node->fc_hash_lock);
904
905 node->live = false;
906 // Clean up all fc's attached to this node.
907 cf_shash_reduce(node->fc_hash, fabric_node_disconnect_reduce_fn, NULL);
908
909 cf_mutex_unlock(&node->fc_hash_lock);
910
911 cf_mutex_lock(&node->send_idle_fc_queue_lock);
912
913 for (int i = 0; i < AS_FABRIC_N_CHANNELS; i++) {
914 while (true) {
915 fabric_connection *fc;
916
917 int rv = cf_queue_pop(&node->send_idle_fc_queue[i], &fc,
918 CF_QUEUE_NOWAIT);
919
920 if (rv != CF_QUEUE_OK) {
921 break;
922 }
923
924 fabric_connection_send_unassign(fc);
925 fabric_connection_release(fc);
926 }
927 }
928
929 cf_mutex_unlock(&node->send_idle_fc_queue_lock);
930
931 fabric_node_release(node); // from fabric_node_pop()
932}
933
934static fabric_connection *
935fabric_node_connect(fabric_node *node, uint32_t ch)
936{
937 cf_detail(AS_FABRIC, "fabric_node_connect(%p, %u)", node, ch);
938
939 cf_mutex_lock(&node->connect_lock);
940
941 uint32_t fds = node->connect_count[ch] + 1;
942
943 if (fds > g_fabric_connect_limit[ch]) {
944 cf_mutex_unlock(&node->connect_lock);
945 return NULL;
946 }
947
948 cf_socket sock;
949 cf_sock_addr addr;
950 size_t endpoint_list_capacity = 1024;
951 int tries_remaining = 3;
952
953 while (tries_remaining--) {
954 uint8_t endpoint_list_mem[endpoint_list_capacity];
955 as_endpoint_list *endpoint_list = (as_endpoint_list *)endpoint_list_mem;
956
957 if (fabric_endpoint_list_get(node->node_id, endpoint_list,
958 &endpoint_list_capacity)) {
959 char endpoint_list_str[1024];
960
961 as_endpoint_list_to_string(endpoint_list, endpoint_list_str,
962 sizeof(endpoint_list_str));
963 cf_detail(AS_FABRIC, "fabric_node_connect(%p, %u) node_id %lx with endpoints {%s}", node, ch, node->node_id, endpoint_list_str);
964
965 // Initiate connect to the remote endpoint.
966 const as_endpoint *connected_endpoint = as_endpoint_connect_any(
967 endpoint_list, fabric_connect_endpoint_filter, NULL, 0,
968 &sock);
969
970 if (! connected_endpoint) {
971 cf_detail(AS_FABRIC, "fabric_node_connect(%p, %u) node_id %lx failed for endpoints {%s}", node, ch, node->node_id, endpoint_list_str);
972 cf_mutex_unlock(&node->connect_lock);
973 return NULL;
974 }
975
976 as_endpoint_to_sock_addr(connected_endpoint, &addr);
977
978 if (as_endpoint_capability_is_supported(connected_endpoint,
979 AS_ENDPOINT_TLS_MASK)) {
980 tls_socket_prepare_client(g_fabric_tls, &sock);
981 }
982
983 break; // read success
984 }
985
986 if (errno == ENOENT) {
987 // No entry present for this node in heartbeat.
988 cf_detail(AS_FABRIC, "fabric_node_connect(%p, %u) unknown remote node %lx", node, ch, node->node_id);
989 cf_mutex_unlock(&node->connect_lock);
990 return NULL;
991 }
992
993 // The list capacity was not enough. Retry with suggested list size.
994 }
995
996 if (tries_remaining < 0) {
997 cf_warning(AS_FABRIC,"fabric_node_connect(%p, %u) List get error for remote node %lx", node, ch, node->node_id);
998 cf_mutex_unlock(&node->connect_lock);
999 return NULL;
1000 }
1001
1002 msg *m = as_fabric_msg_get(M_TYPE_FABRIC);
1003
1004 cf_atomic64_incr(&g_stats.fabric_connections_opened);
1005 as_health_add_node_counter(node->node_id, AS_HEALTH_NODE_FABRIC_FDS);
1006
1007 msg_set_uint64(m, FS_FIELD_NODE, g_config.self_node);
1008 msg_set_uint32(m, FS_CHANNEL, ch);
1009 m->benchmark_time = g_config.fabric_benchmarks_enabled ? cf_getns() : 0;
1010
1011 fabric_connection *fc = fabric_connection_create(&sock, &addr);
1012
1013 fc->s_msg_in_progress = m;
1014 fc->started_via_connect = true;
1015 fc->pool = &g_fabric.recv_pool[ch];
1016
1017 if (! fabric_node_add_connection(node, fc)) {
1018 fabric_connection_release(fc);
1019 cf_mutex_unlock(&node->connect_lock);
1020 return NULL;
1021 }
1022
1023 node->connect_count[ch]++;
1024 node->connect_full = fabric_node_is_connect_full(node);
1025
1026 cf_mutex_unlock(&node->connect_lock);
1027
1028 return fc;
1029}
1030
1031static int
1032fabric_node_send(fabric_node *node, msg *m, as_fabric_channel channel)
1033{
1034 if (! node || ! node->live) {
1035 return AS_FABRIC_ERR_NO_NODE;
1036 }
1037
1038 while (true) {
1039 // Sync with fabric_connection_process_writable() to avoid non-empty
1040 // send_queue with every fc being in send_idle_fc_queue.
1041 cf_mutex_lock(&node->send_idle_fc_queue_lock);
1042
1043 fabric_connection *fc;
1044 int rv = cf_queue_pop(&node->send_idle_fc_queue[(int)channel], &fc,
1045 CF_QUEUE_NOWAIT);
1046
1047 if (rv != CF_QUEUE_OK) {
1048 cf_queue_push(&node->send_queue[(int)channel], &m);
1049 cf_mutex_unlock(&node->send_idle_fc_queue_lock);
1050
1051 if (! node->connect_full) {
1052 fabric_node_connect_all(node);
1053 }
1054
1055 break;
1056 }
1057
1058 cf_mutex_unlock(&node->send_idle_fc_queue_lock);
1059
1060 if ((! cf_socket_exists(&fc->sock)) || fc->failed) {
1061 fabric_connection_send_unassign(fc);
1062 fabric_connection_release(fc); // send_idle_fc_queue
1063 continue;
1064 }
1065
1066 fc->s_msg_in_progress = m;
1067
1068 // Wake up.
1069 if (fc->send_ptr) {
1070 fabric_connection_send_rearm(fc); // takes fc ref
1071 }
1072 else {
1073 fabric_connection_send_assign(fc); // takes fc ref
1074 }
1075
1076 break;
1077 }
1078
1079 return AS_FABRIC_SUCCESS;
1080}
1081
1082static void
1083fabric_node_connect_all(fabric_node *node)
1084{
1085 if (! node->live) {
1086 return;
1087 }
1088
1089 for (uint32_t ch = 0; ch < AS_FABRIC_N_CHANNELS; ch++) {
1090 uint32_t n = g_fabric_connect_limit[ch] - node->connect_count[ch];
1091
1092 for (uint32_t i = 0; i < n; i++) {
1093 fabric_connection *fc = fabric_node_connect(node, ch);
1094
1095 if (! fc) {
1096 break;
1097 }
1098
1099 // TLS connections are one-way. Outgoing connections are for
1100 // outgoing data.
1101 if (fc->sock.state == CF_SOCKET_STATE_NON_TLS) {
1102 fabric_recv_thread_pool_add_fc(&g_fabric.recv_pool[ch], fc);
1103 cf_detail(AS_FABRIC, "{%16lX, %u} activated", fabric_connection_get_id(fc), fc->sock.fd);
1104
1105 if (channel_nagle[ch]) {
1106 fc->s_cork_bypass = true;
1107 cf_socket_enable_nagle(&fc->sock);
1108 }
1109 }
1110 else {
1111 fc->s_cork_bypass = true;
1112 }
1113
1114 // Takes the remaining ref for send_poll and idle queue.
1115 fabric_connection_send_assign(fc);
1116 }
1117 }
1118}
1119
1120static void
1121fabric_node_destructor(void *pnode)
1122{
1123 fabric_node *node = (fabric_node *)pnode;
1124 cf_detail(AS_FABRIC, "fabric_node_destructor(%p)", node);
1125
1126 for (int i = 0; i < AS_FABRIC_N_CHANNELS; i++) {
1127 // send_idle_fc_queue section.
1128 cf_assert(cf_queue_sz(&node->send_idle_fc_queue[i]) == 0, AS_FABRIC, "send_idle_fc_queue not empty as expected");
1129 cf_queue_destroy(&node->send_idle_fc_queue[i]);
1130
1131 // send_queue section.
1132 while (true) {
1133 msg *m;
1134
1135 if (cf_queue_pop(&node->send_queue[i], &m, CF_QUEUE_NOWAIT) !=
1136 CF_QUEUE_OK) {
1137 break;
1138 }
1139
1140 as_fabric_msg_put(m);
1141 }
1142
1143 cf_queue_destroy(&node->send_queue[i]);
1144 }
1145
1146 cf_mutex_destroy(&node->send_idle_fc_queue_lock);
1147
1148 // connection_hash section.
1149 cf_assert(cf_shash_get_size(node->fc_hash) == 0, AS_FABRIC, "fc_hash not empty as expected");
1150 cf_shash_destroy(node->fc_hash);
1151
1152 cf_mutex_destroy(&node->fc_hash_lock);
1153}
1154
1155inline static void
1156fabric_node_reserve(fabric_node *node) {
1157 cf_rc_reserve(node);
1158}
1159
1160inline static void
1161fabric_node_release(fabric_node *node)
1162{
1163 if (cf_rc_release(node) == 0) {
1164 fabric_node_destructor(node);
1165 cf_rc_free(node);
1166 }
1167}
1168
1169static bool
1170fabric_node_add_connection(fabric_node *node, fabric_connection *fc)
1171{
1172 cf_mutex_lock(&node->fc_hash_lock);
1173
1174 if (! node->live) {
1175 cf_mutex_unlock(&node->fc_hash_lock);
1176 return false;
1177 }
1178
1179 fabric_node_reserve(node);
1180 fc->node = node;
1181
1182 fabric_connection_set_keepalive_options(fc);
1183 fabric_connection_reserve(fc); // for put into node->fc_hash
1184
1185 uint8_t value = 0;
1186 int rv = cf_shash_put_unique(node->fc_hash, &fc, &value);
1187
1188 cf_assert(rv == CF_SHASH_OK, AS_FABRIC, "fabric_node_add_connection(%p, %p) failed to add with rv %d", node, fc, rv);
1189
1190 cf_mutex_unlock(&node->fc_hash_lock);
1191
1192 return true;
1193}
1194
1195static uint8_t
1196fabric_node_find_min_send_count(const fabric_node *node)
1197{
1198 uint8_t min = node->send_counts[0];
1199
1200 for (uint32_t i = 1; i < g_config.n_fabric_send_threads; i++) {
1201 if (node->send_counts[i] < min) {
1202 min = node->send_counts[i];
1203 }
1204 }
1205
1206 return min;
1207}
1208
1209static bool
1210fabric_node_is_connect_full(const fabric_node *node)
1211{
1212 for (int ch = 0; ch < AS_FABRIC_N_CHANNELS; ch++) {
1213 if (node->connect_count[ch] < g_fabric_connect_limit[ch]) {
1214 return false;
1215 }
1216 }
1217
1218 return true;
1219}
1220
1221
1222static int
1223fabric_get_node_list_fn(const void *key, uint32_t keylen, void *data,
1224 void *udata)
1225{
1226 node_list *nl = (node_list *)udata;
1227
1228 if (nl->count == AS_CLUSTER_SZ) {
1229 return 0;
1230 }
1231
1232 nl->nodes[nl->count] = *(const cf_node *)key;
1233 nl->count++;
1234
1235 return 0;
1236}
1237
1238// Get a list of all the nodes - use a dynamic array, which requires inline.
1239static uint32_t
1240fabric_get_node_list(node_list *nl)
1241{
1242 nl->count = 1;
1243 nl->nodes[0] = g_config.self_node;
1244
1245 cf_mutex_lock(&g_fabric.node_hash_lock);
1246 cf_rchash_reduce(g_fabric.node_hash, fabric_get_node_list_fn, nl);
1247 cf_mutex_unlock(&g_fabric.node_hash_lock);
1248
1249 return nl->count;
1250}
1251
1252
1253//==========================================================
1254// fabric_connection
1255//
1256
1257fabric_connection *
1258fabric_connection_create(cf_socket *sock, cf_sock_addr *peer)
1259{
1260 fabric_connection *fc = cf_rc_alloc(sizeof(fabric_connection));
1261
1262 memset(fc, 0, sizeof(fabric_connection));
1263
1264 cf_socket_copy(sock, &fc->sock);
1265 cf_sock_addr_copy(peer, &fc->peer);
1266
1267 fc->r_type = M_TYPE_FABRIC;
1268
1269 return fc;
1270}
1271
1272static bool
1273fabric_connection_accept_tls(fabric_connection *fc)
1274{
1275 int32_t tls_ev = tls_socket_accept(&fc->sock);
1276
1277 if (tls_ev == EPOLLERR) {
1278 cf_warning(AS_FABRIC, "fabric TLS server handshake with %s failed", cf_sock_addr_print(&fc->peer));
1279 return false;
1280 }
1281
1282 if (tls_ev == 0) {
1283 tls_socket_must_not_have_data(&fc->sock, "fabric server handshake");
1284 tls_ev = EPOLLIN;
1285 }
1286
1287 cf_poll_modify_socket(g_accept_poll, &fc->sock,
1288 tls_ev | EPOLLERR | EPOLLHUP | EPOLLRDHUP, fc);
1289 return true;
1290}
1291
1292static bool
1293fabric_connection_connect_tls(fabric_connection *fc)
1294{
1295 int32_t tls_ev = tls_socket_connect(&fc->sock);
1296
1297 if (tls_ev == EPOLLERR) {
1298 cf_warning(AS_FABRIC, "fabric TLS client handshake with %s failed", cf_sock_addr_print(&fc->peer));
1299 return false;
1300 }
1301
1302 if (tls_ev == 0) {
1303 tls_socket_must_not_have_data(&fc->sock, "fabric client handshake");
1304 tls_ev = EPOLLOUT;
1305 }
1306
1307 cf_poll_modify_socket(fc->send_ptr->poll, &fc->sock,
1308 tls_ev | DEFAULT_EVENTS, fc);
1309 return true;
1310}
1311
1312inline static void
1313fabric_connection_reserve(fabric_connection *fc)
1314{
1315 cf_rc_reserve(fc);
1316}
1317
1318static void
1319fabric_connection_release(fabric_connection *fc)
1320{
1321 if (cf_rc_release(fc) == 0) {
1322 if (fc->s_msg_in_progress) {
1323 // First message (s_count == 0) is initial M_TYPE_FABRIC message
1324 // and does not need to be saved.
1325 if (! fc->started_via_connect || fc->s_count != 0) {
1326 cf_queue_push(&fc->node->send_queue[fc->pool->pool_id],
1327 &fc->s_msg_in_progress);
1328 }
1329 else {
1330 as_fabric_msg_put(fc->s_msg_in_progress);
1331 }
1332 }
1333
1334 if (fc->node) {
1335 fabric_node_release(fc->node);
1336 fc->node = NULL;
1337 }
1338 else {
1339 cf_detail(AS_FABRIC, "releasing fc %p not attached to a node", fc);
1340 }
1341
1342 cf_socket_close(&fc->sock);
1343 cf_socket_term(&fc->sock);
1344 cf_atomic64_incr(&g_stats.fabric_connections_closed);
1345
1346 cf_free(fc->r_bigbuf);
1347 cf_rc_free(fc);
1348 }
1349}
1350
1351inline static cf_node
1352fabric_connection_get_id(const fabric_connection *fc)
1353{
1354 if (fc->node) {
1355 return fc->node->node_id;
1356 }
1357
1358 return 0;
1359}
1360
1361inline static void
1362fabric_connection_cork(fabric_connection *fc)
1363{
1364 if (fc->s_cork == 1 || fc->s_cork_bypass) {
1365 return;
1366 }
1367
1368 fc->s_cork = 1;
1369 cf_socket_set_cork(&fc->sock, fc->s_cork);
1370}
1371
1372inline static void
1373fabric_connection_uncork(fabric_connection *fc)
1374{
1375 if (fc->s_cork == 0 || fc->s_cork_bypass) {
1376 return;
1377 }
1378
1379 fc->s_cork = 0;
1380 cf_socket_set_cork(&fc->sock, fc->s_cork);
1381}
1382
1383// epoll takes the reference of fc.
1384static void
1385fabric_connection_send_assign(fabric_connection *fc)
1386{
1387 cf_mutex_lock(&g_fabric.send_lock);
1388
1389 send_entry **pp = &g_fabric.send_head;
1390 uint8_t min = fabric_node_find_min_send_count(fc->node);
1391
1392 while (true) {
1393 uint32_t send_id = (*pp)->id;
1394
1395 if (fc->node->send_counts[send_id] == min) {
1396 break;
1397 }
1398
1399 cf_assert((*pp)->next, AS_FABRIC, "fabric_connection_send_assign() invalid send_count state");
1400
1401 pp = &(*pp)->next;
1402 }
1403
1404 send_entry *se = *pp;
1405
1406 se->count++;
1407 fc->node->send_counts[se->id]++;
1408
1409 if (se->next && se->next->count < se->count) {
1410 *pp = se->next;
1411 send_entry_insert(pp, se);
1412 }
1413
1414 fc->send_ptr = se;
1415
1416 cf_mutex_unlock(&g_fabric.send_lock);
1417
1418 cf_poll_add_socket(se->poll, &fc->sock, EPOLLOUT | DEFAULT_EVENTS, fc);
1419}
1420
1421static void
1422fabric_connection_send_unassign(fabric_connection *fc)
1423{
1424 cf_mutex_lock(&g_fabric.send_lock);
1425
1426 if (! fc->send_ptr) {
1427 cf_mutex_unlock(&g_fabric.send_lock);
1428 return;
1429 }
1430
1431 send_entry **pp = &g_fabric.send_head;
1432 send_entry *se = fc->send_ptr;
1433
1434 while (*pp != se) {
1435 cf_assert((*pp)->next, AS_FABRIC, "fabric_connection_send_unassign() invalid send_count state");
1436
1437 pp = &(*pp)->next;
1438 }
1439
1440 cf_assert(se->count != 0 || fc->node->send_counts[se->id] != 0, AS_FABRIC, "invalid send_count accounting se %p id %u count %u node send_count %u",
1441 se, se->id, se->count, fc->node->send_counts[se->id]);
1442
1443 se->count--;
1444 fc->node->send_counts[se->id]--;
1445
1446 *pp = se->next;
1447 send_entry_insert(&g_fabric.send_head, se);
1448
1449 fc->send_ptr = NULL;
1450
1451 cf_mutex_unlock(&g_fabric.send_lock);
1452}
1453
1454inline static void
1455fabric_connection_recv_rearm(fabric_connection *fc)
1456{
1457 fc->r_rearm_count++;
1458 cf_poll_modify_socket(fc->pool->poll, &fc->sock,
1459 EPOLLIN | DEFAULT_EVENTS, fc);
1460}
1461
1462// epoll takes the reference of fc.
1463inline static void
1464fabric_connection_send_rearm(fabric_connection *fc)
1465{
1466 cf_poll_modify_socket(fc->send_ptr->poll, &fc->sock,
1467 EPOLLOUT | DEFAULT_EVENTS, fc);
1468}
1469
1470static void
1471fabric_connection_disconnect(fabric_connection *fc)
1472{
1473 fc->failed = true;
1474 cf_socket_shutdown(&fc->sock);
1475
1476 fabric_node *node = fc->node;
1477
1478 if (! node) {
1479 return;
1480 }
1481
1482 cf_mutex_lock(&node->fc_hash_lock);
1483
1484 if (cf_shash_delete(node->fc_hash, &fc) != CF_SHASH_OK) {
1485 cf_detail(AS_FABRIC, "fc %p is not in (node %p)->fc_hash", fc, node);
1486 cf_mutex_unlock(&node->fc_hash_lock);
1487 return;
1488 }
1489
1490 cf_mutex_unlock(&node->fc_hash_lock);
1491
1492 if (fc->started_via_connect) {
1493 cf_mutex_lock(&node->connect_lock);
1494
1495 cf_atomic32_decr(&node->connect_count[fc->pool->pool_id]);
1496 node->connect_full = false;
1497
1498 cf_mutex_unlock(&node->connect_lock);
1499 }
1500
1501 cf_mutex_lock(&node->send_idle_fc_queue_lock);
1502
1503 if (cf_queue_delete(&node->send_idle_fc_queue[fc->pool->pool_id], &fc,
1504 true) == CF_QUEUE_OK) {
1505 fabric_connection_send_unassign(fc);
1506 fabric_connection_release(fc); // for delete from send_idle_fc_queue
1507 }
1508
1509 cf_mutex_unlock(&node->send_idle_fc_queue_lock);
1510
1511 cf_detail(AS_FABRIC, "fabric_connection_disconnect(%p) {pool=%u id=%lx fd=%u}",
1512 fc, fc->pool ? fc->pool->pool_id : 0,
1513 node ? node->node_id : (cf_node)0, fc->sock.fd);
1514
1515 fabric_connection_release(fc); // for delete from node->fc_hash
1516}
1517
1518static void
1519fabric_connection_set_keepalive_options(fabric_connection *fc)
1520{
1521 if (g_config.fabric_keepalive_enabled) {
1522 cf_socket_keep_alive(&fc->sock, g_config.fabric_keepalive_time,
1523 g_config.fabric_keepalive_intvl,
1524 g_config.fabric_keepalive_probes);
1525 }
1526}
1527
1528static void
1529fabric_connection_reroute_msg(fabric_connection *fc)
1530{
1531 if (! fc->s_msg_in_progress) {
1532 return;
1533 }
1534
1535 // Don't reroute initial M_TYPE_FABRIC message.
1536 if ((fc->started_via_connect && fc->s_count == 0) ||
1537 fabric_node_send(fc->node, fc->s_msg_in_progress,
1538 fc->pool->pool_id) != AS_FABRIC_SUCCESS) {
1539 as_fabric_msg_put(fc->s_msg_in_progress);
1540 }
1541
1542 fc->s_msg_in_progress = NULL;
1543}
1544
1545static void
1546fabric_connection_incr_iov(fabric_connection *fc, uint32_t sz)
1547{
1548 while (sz != 0) {
1549 if (sz <= fc->s_iov->iov_len) {
1550 fc->s_iov->iov_base = (uint8_t *)fc->s_iov->iov_base + sz;
1551 fc->s_iov->iov_len -= sz;
1552 return;
1553 }
1554
1555 cf_assert(fc->s_iov_count != 0, AS_PARTICLE, "fc->s_iov_count == 0");
1556 sz -= fc->s_iov->iov_len;
1557 fc->s_iov->iov_len = 0;
1558 fc->s_iov++;
1559 fc->s_iov_count--;
1560 }
1561}
1562
1563static bool
1564fabric_connection_send_progress(fabric_connection *fc)
1565{
1566 if (fc->s_msg_sz == 0) { // new msg
1567 msg *m = fc->s_msg_in_progress;
1568
1569 fc->s_iov = (struct iovec *)fc->s_buf;
1570 fc->s_iov_count = msg_to_iov_buf(m, fc->s_buf, sizeof(fc->s_buf),
1571 &fc->s_msg_sz);
1572 fc->s_sz = 0;
1573
1574 if (m->benchmark_time != 0) {
1575 m->benchmark_time = histogram_insert_data_point(
1576 g_stats.fabric_send_init_hists[fc->pool->pool_id],
1577 m->benchmark_time);
1578 }
1579 }
1580
1581 struct msghdr sendhdr = {
1582 .msg_iov = fc->s_iov,
1583 .msg_iovlen = fc->s_iov_count
1584 };
1585
1586 int32_t send_sz = cf_socket_send_msg(&fc->sock, &sendhdr, 0);
1587
1588 if (send_sz < 0) {
1589 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1590 return false;
1591 }
1592
1593 send_sz = 0; // treat as sending 0
1594 }
1595
1596 if (fc->s_msg_in_progress->benchmark_time != 0) {
1597 fc->s_msg_in_progress->benchmark_time = histogram_insert_data_point(
1598 g_stats.fabric_send_fragment_hists[fc->pool->pool_id],
1599 fc->s_msg_in_progress->benchmark_time);
1600 }
1601
1602 fc->s_sz += send_sz;
1603 fc->s_bytes += send_sz;
1604
1605 if (fc->s_sz == fc->s_msg_sz) { // complete send
1606 as_fabric_msg_put(fc->s_msg_in_progress);
1607 fc->s_msg_in_progress = NULL;
1608 fc->s_msg_sz = 0;
1609 fc->s_count++;
1610 }
1611 else { // partial send
1612 fabric_connection_incr_iov(fc, (uint32_t)send_sz);
1613 }
1614
1615 return true;
1616}
1617
1618// Must rearm or place into idle queue on success.
1619static bool
1620fabric_connection_process_writable(fabric_connection *fc)
1621{
1622 fabric_node *node = fc->node;
1623 uint32_t pool = fc->pool->pool_id;
1624
1625 if (! fc->s_msg_in_progress) {
1626 // TODO - Change to load op when atomic API is ready.
1627 // Also should be rare or not even happen in x86_64.
1628 cf_warning(AS_FABRIC, "fc(%p)->s_msg_in_progress NULL on entry", fc);
1629 return false;
1630 }
1631
1632 fabric_connection_cork(fc);
1633
1634 while (true) {
1635 if (! fabric_connection_send_progress(fc)) {
1636 return false;
1637 }
1638
1639 if (fc->s_msg_in_progress) {
1640 fabric_connection_send_rearm(fc);
1641 return true;
1642 }
1643
1644 if (cf_queue_pop(&node->send_queue[pool], &fc->s_msg_in_progress,
1645 CF_QUEUE_NOWAIT) != CF_QUEUE_OK) {
1646 break;
1647 }
1648 }
1649
1650 fabric_connection_uncork(fc);
1651
1652 if (! fc->node->live || fc->failed) {
1653 return false;
1654 }
1655
1656 // Try with bigger lock block to sync with as_fabric_send().
1657 cf_mutex_lock(&node->send_idle_fc_queue_lock);
1658
1659 if (! fc->node->live || fc->failed) {
1660 cf_mutex_unlock(&node->send_idle_fc_queue_lock);
1661 return false;
1662 }
1663
1664 if (cf_queue_pop(&node->send_queue[pool], &fc->s_msg_in_progress,
1665 CF_QUEUE_NOWAIT) == CF_QUEUE_EMPTY) {
1666 cf_queue_push(&node->send_idle_fc_queue[pool], &fc);
1667 cf_mutex_unlock(&node->send_idle_fc_queue_lock);
1668 return true;
1669 }
1670
1671 cf_mutex_unlock(&node->send_idle_fc_queue_lock);
1672
1673 fabric_connection_send_rearm(fc);
1674
1675 return true;
1676}
1677
1678// Return true on success.
1679static bool
1680fabric_connection_process_fabric_msg(fabric_connection *fc, const msg *m)
1681{
1682 cf_poll_delete_socket(g_accept_poll, &fc->sock);
1683
1684 cf_node node_id;
1685
1686 if (msg_get_uint64(m, FS_FIELD_NODE, &node_id) != 0) {
1687 cf_warning(AS_FABRIC, "process_fabric_msg: failed to read M_TYPE_FABRIC node");
1688 return false;
1689 }
1690
1691 cf_detail(AS_FABRIC, "process_fabric_msg: M_TYPE_FABRIC from node %lx", node_id);
1692
1693 fabric_node *node = fabric_node_get_or_create(node_id);
1694
1695 if (! fabric_node_add_connection(node, fc)) {
1696 fabric_node_release(node); // from cf_rchash_get
1697 return false;
1698 }
1699
1700 uint32_t pool_id = AS_FABRIC_N_CHANNELS; // illegal value
1701
1702 msg_get_uint32(m, FS_CHANNEL, &pool_id);
1703
1704 if (pool_id >= AS_FABRIC_N_CHANNELS) {
1705 fabric_node_release(node); // from cf_rchash_get
1706 return false;
1707 }
1708
1709 fc->r_sz = 0;
1710 fc->r_buf_sz = 0;
1711
1712 // fc->pool needs to be set before placing into send_idle_fc_queue.
1713 fabric_recv_thread_pool_add_fc(&g_fabric.recv_pool[pool_id], fc);
1714
1715 // TLS connections are one-way. Incoming connections are for
1716 // incoming data.
1717 if (fc->sock.state == CF_SOCKET_STATE_NON_TLS) {
1718 if (channel_nagle[pool_id]) {
1719 fc->s_cork_bypass = true;
1720 cf_socket_enable_nagle(&fc->sock);
1721 }
1722
1723 cf_mutex_lock(&node->send_idle_fc_queue_lock);
1724
1725 if (node->live && ! fc->failed) {
1726 fabric_connection_reserve(fc); // for send poll & idleQ
1727
1728 if (cf_queue_pop(&node->send_queue[pool_id], &fc->s_msg_in_progress,
1729 CF_QUEUE_NOWAIT) == CF_QUEUE_EMPTY) {
1730 cf_queue_push(&node->send_idle_fc_queue[pool_id], &fc);
1731 }
1732 else {
1733 fabric_connection_send_assign(fc);
1734 }
1735 }
1736
1737 cf_mutex_unlock(&node->send_idle_fc_queue_lock);
1738 }
1739 else {
1740 fc->s_cork_bypass = true;
1741 }
1742
1743 fabric_node_release(node); // from cf_rchash_get
1744 fabric_connection_release(fc); // from g_accept_poll
1745
1746 return true;
1747}
1748
1749static bool
1750fabric_connection_read_fabric_msg(fabric_connection *fc)
1751{
1752 while (true) {
1753 int32_t recv_sz = cf_socket_recv(&fc->sock, fc->r_buf + fc->r_sz,
1754 sizeof(msg_hdr) + fc->r_buf_sz - fc->r_sz, 0);
1755
1756 if (recv_sz < 0) {
1757 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1758 cf_warning(AS_FABRIC, "fabric_connection_read_fabric_msg() recv_sz %d errno %d %s", recv_sz, errno, cf_strerror(errno));
1759 return false;
1760 }
1761
1762 break;
1763 }
1764
1765 if (recv_sz == 0) {
1766 cf_detail(AS_FABRIC, "fabric_connection_read_fabric_msg(%p) recv_sz 0 r_msg_sz %u", fc, fc->r_buf_sz);
1767 return false;
1768 }
1769
1770 fc->r_sz += recv_sz;
1771 fc->r_bytes += recv_sz;
1772
1773 if (fc->r_buf_sz == 0) {
1774 if (fc->r_sz < sizeof(msg_hdr)) {
1775 tls_socket_must_not_have_data(&fc->sock, "partial fabric read");
1776 break;
1777 }
1778
1779 msg_parse_hdr(&fc->r_buf_sz, &fc->r_type, fc->r_buf, fc->r_sz);
1780
1781 if (fc->r_buf_sz >= sizeof(fc->r_buf)) {
1782 cf_warning(AS_FABRIC, "r_msg_sz > sizeof(fc->r_membuf) %zu", sizeof(fc->r_buf));
1783 return false;
1784 }
1785
1786 if (fc->r_buf_sz != 0) {
1787 continue;
1788 }
1789 }
1790
1791 if (fc->r_sz < sizeof(msg_hdr) + fc->r_buf_sz) {
1792 tls_socket_must_not_have_data(&fc->sock, "partial fabric read");
1793 break;
1794 }
1795
1796 tls_socket_must_not_have_data(&fc->sock, "full fabric read");
1797
1798 if (fc->r_type != M_TYPE_FABRIC) {
1799 cf_warning(AS_FABRIC, "fabric_connection_read_fabric_msg() expected type M_TYPE_FABRIC(%d) got type %d", M_TYPE_FABRIC, fc->r_type);
1800 return false;
1801 }
1802
1803 msg *m = as_fabric_msg_get(M_TYPE_FABRIC);
1804
1805 if (! msg_parse_fields(m, fc->r_buf + sizeof(msg_hdr), fc->r_buf_sz)) {
1806 cf_warning(AS_FABRIC, "msg_parse failed for fc %p", fc);
1807 as_fabric_msg_put(m);
1808 return false;
1809 }
1810
1811 bool ret = fabric_connection_process_fabric_msg(fc, m);
1812
1813 as_fabric_msg_put(m);
1814
1815 return ret;
1816 }
1817
1818 return true;
1819}
1820
1821// Return true on success.
1822// Must have re-armed on success if do_rearm == true.
1823static bool
1824fabric_connection_process_msg(fabric_connection *fc, bool do_rearm)
1825{
1826 cf_assert(fc->node, AS_FABRIC, "process_msg: no node assigned");
1827
1828 uint32_t read_ahead_sz = fc->r_sz - fc->r_buf_sz;
1829 uint32_t mem_sz = fc->r_buf_sz;
1830 uint8_t *p_bigbuf = fc->r_bigbuf; // malloc handoff
1831
1832 fc->r_bigbuf = NULL;
1833
1834 if (p_bigbuf) {
1835 fc->r_sz = read_ahead_sz;
1836 mem_sz = 0;
1837 }
1838
1839 while (read_ahead_sz > sizeof(msg_hdr)) {
1840 read_ahead_sz -= sizeof(msg_hdr);
1841
1842 uint32_t *ptr = (uint32_t *)(fc->r_buf + mem_sz);
1843 uint32_t sz = cf_swap_from_be32(*ptr);
1844
1845 if (read_ahead_sz < sz) {
1846 break;
1847 }
1848
1849 mem_sz += sizeof(msg_hdr) + sz;
1850 read_ahead_sz -= sz;
1851 }
1852
1853 uint8_t stack_mem[mem_sz + 1]; // +1 to account for mem_sz == 0
1854
1855 memcpy(stack_mem, fc->r_buf, mem_sz);
1856 fc->r_sz -= mem_sz;
1857 memmove(fc->r_buf, fc->r_buf + mem_sz, fc->r_sz);
1858
1859 uint8_t *buf_ptr = p_bigbuf;
1860
1861 if (! buf_ptr) {
1862 buf_ptr = stack_mem;
1863 mem_sz -= fc->r_buf_sz;
1864 }
1865
1866 buf_ptr += sizeof(msg_hdr);
1867
1868 // Save some state for after re-arm.
1869 cf_node node = fc->node->node_id;
1870 uint64_t bt = fc->benchmark_time;
1871 uint32_t ch = fc->pool->pool_id;
1872 msg_type type = fc->r_type;
1873 uint32_t msg_sz = fc->r_buf_sz - sizeof(msg_hdr);
1874
1875 fc->r_buf_sz = 0;
1876
1877 if (do_rearm) {
1878 // Re-arm for next message (possibly handled in another thread).
1879 fabric_connection_recv_rearm(fc); // do not use fc after this point
1880 }
1881
1882 while (true) {
1883 if (! msg_type_is_valid(type)) {
1884 cf_warning(AS_FABRIC, "failed to create message for type %u (max %u)", type, M_TYPE_MAX);
1885 cf_free(p_bigbuf);
1886 return false;
1887 }
1888
1889 msg *m = as_fabric_msg_get(type);
1890
1891 if (! msg_parse_fields(m, buf_ptr, msg_sz)) {
1892 cf_warning(AS_FABRIC, "msg_parse_fields failed for fc %p", fc);
1893 as_fabric_msg_put(m);
1894 cf_free(p_bigbuf);
1895 return false;
1896 }
1897
1898 if (g_fabric.msg_cb[m->type]) {
1899 (g_fabric.msg_cb[m->type])(node, m, g_fabric.msg_udata[m->type]);
1900
1901 if (bt != 0) {
1902 histogram_insert_data_point(g_stats.fabric_recv_cb_hists[ch],
1903 bt);
1904 }
1905 }
1906 else {
1907 cf_warning(AS_FABRIC, "process_msg: could not deliver message type %d", m->type);
1908 as_fabric_msg_put(m);
1909 }
1910
1911 if (p_bigbuf) {
1912 cf_free(p_bigbuf);
1913 p_bigbuf = NULL;
1914 buf_ptr = stack_mem;
1915 }
1916 else {
1917 buf_ptr += msg_sz;
1918 }
1919
1920 if (mem_sz < sizeof(msg_hdr)) {
1921 cf_assert(mem_sz == 0, AS_FABRIC, "process_msg: stack_sz left %u != 0", mem_sz);
1922 break;
1923 }
1924
1925 msg_parse_hdr(&msg_sz, &type, buf_ptr, mem_sz);
1926 buf_ptr += sizeof(msg_hdr);
1927 mem_sz -= sizeof(msg_hdr) + msg_sz;
1928 }
1929
1930 return true;
1931}
1932
1933// Return true on success.
1934// Must have re-armed on success.
1935static bool
1936fabric_connection_process_readable(fabric_connection *fc)
1937{
1938 size_t recv_all = 0;
1939
1940 while (true) {
1941 int32_t recv_sz;
1942
1943 if (! fc->r_bigbuf) {
1944 recv_sz = cf_socket_recv(&fc->sock, fc->r_buf + fc->r_sz,
1945 sizeof(fc->r_buf) - fc->r_sz, 0);
1946 }
1947 else {
1948 struct iovec iov[2] = {
1949 {
1950 .iov_base = fc->r_bigbuf + fc->r_sz,
1951 .iov_len = fc->r_buf_sz - fc->r_sz
1952 },
1953 {
1954 .iov_base = fc->r_buf,
1955 .iov_len = sizeof(fc->r_buf) // read ahead
1956 }
1957 };
1958
1959 struct msghdr recvhdr = {
1960 .msg_iov = iov,
1961 .msg_iovlen = 2
1962 };
1963
1964 recv_sz = cf_socket_recv_msg(&fc->sock, &recvhdr, 0);
1965 }
1966
1967 if (recv_sz < 0) {
1968 if (errno != EAGAIN && errno != EWOULDBLOCK) {
1969 cf_warning(AS_FABRIC, "fabric_connection_process_readable() recv_sz %d msg_sz %u errno %d %s", recv_sz, fc->r_buf_sz, errno, cf_strerror(errno));
1970 return false;
1971 }
1972
1973 break;
1974 }
1975 else if (recv_sz == 0) {
1976 cf_detail(AS_FABRIC, "fabric_connection_process_readable(%p) recv_sz 0 msg_sz %u", fc, fc->r_buf_sz);
1977 return false;
1978 }
1979
1980 fc->r_sz += recv_sz;
1981 fc->r_bytes += recv_sz;
1982 recv_all += recv_sz;
1983
1984 if (fc->r_buf_sz == 0) {
1985 fc->benchmark_time = g_config.fabric_benchmarks_enabled ?
1986 cf_getns() : 0;
1987
1988 if (fc->r_sz < sizeof(msg_hdr)) {
1989 break;
1990 }
1991
1992 msg_parse_hdr(&fc->r_buf_sz, &fc->r_type, fc->r_buf, fc->r_sz);
1993 fc->r_buf_sz += sizeof(msg_hdr);
1994
1995 if (fc->r_buf_sz > sizeof(fc->r_buf)) {
1996 if (fc->r_buf_sz > FABRIC_BUFFER_MAX_SZ) {
1997 cf_warning(AS_FABRIC, "fabric_connection_process_readable(%p) invalid msg_size %u remote 0x%lx", fc, fc->r_buf_sz, fabric_connection_get_id(fc));
1998 return false;
1999 }
2000
2001 fc->r_bigbuf = cf_malloc(fc->r_buf_sz);
2002 memcpy(fc->r_bigbuf, fc->r_buf, fc->r_sz); // fc->r_sz < r_msg_sz here
2003 }
2004 }
2005
2006 if (fc->r_sz < fc->r_buf_sz) {
2007 if (fc->benchmark_time != 0) {
2008 fc->benchmark_time = histogram_insert_data_point(
2009 g_stats.fabric_recv_fragment_hists[fc->pool->pool_id],
2010 fc->benchmark_time);
2011 }
2012
2013 break;
2014 }
2015
2016 bool do_rearm =
2017 recv_all > (size_t)g_config.fabric_recv_rearm_threshold ||
2018 fc->r_buf_sz > g_config.fabric_recv_rearm_threshold;
2019
2020 if (! fabric_connection_process_msg(fc, do_rearm)) {
2021 return false;
2022 }
2023
2024 if (do_rearm) {
2025 // Already rearmed.
2026 return true;
2027 }
2028 }
2029
2030 fabric_connection_recv_rearm(fc);
2031 return true;
2032}
2033
2034
2035//==========================================================
2036// fabric_recv_thread_pool
2037//
2038
2039static void
2040fabric_recv_thread_pool_init(fabric_recv_thread_pool *pool, uint32_t size,
2041 uint32_t pool_id)
2042{
2043 cf_vector_init(&pool->threads, sizeof(cf_tid), size, 0);
2044 cf_poll_create(&pool->poll);
2045 pool->pool_id = pool_id;
2046}
2047
2048// Called only at startup or under set-config lock. Caller has checked size.
2049static void
2050fabric_recv_thread_pool_set_size(fabric_recv_thread_pool *pool, uint32_t size)
2051{
2052 while (size < cf_vector_size(&pool->threads)) {
2053 cf_tid tid;
2054
2055 cf_vector_pop(&pool->threads, &tid);
2056 cf_thread_cancel(tid);
2057 }
2058
2059 while (size > cf_vector_size(&pool->threads)) {
2060 cf_tid tid = cf_thread_create_detached(run_fabric_recv, (void*)pool);
2061
2062 cf_vector_append(&pool->threads, &tid);
2063 }
2064}
2065
2066static void
2067fabric_recv_thread_pool_add_fc(fabric_recv_thread_pool *pool,
2068 fabric_connection *fc)
2069{
2070 fabric_connection_reserve(fc); // extra ref for poll
2071 fc->pool = pool;
2072
2073 uint32_t recv_events = EPOLLIN | DEFAULT_EVENTS;
2074
2075 cf_poll_add_socket(pool->poll, &fc->sock, recv_events, fc);
2076}
2077
2078
2079//==========================================================
2080// fabric_endpoint
2081//
2082
2083// Get the endpoint list to connect to the remote node.
2084// Returns true on success where errno will be set to ENOENT if there is no
2085// endpoint list could be obtained for this node and ENOMEM if the input
2086// endpoint_list_size is less than actual size. Var endpoint_list_size will be
2087// updated with the required capacity.
2088static bool
2089fabric_endpoint_list_get(cf_node nodeid, as_endpoint_list *endpoint_list,
2090 size_t *endpoint_list_size)
2091{
2092 as_hb_plugin_node_data plugin_data = {
2093 .data_capacity = *endpoint_list_size,
2094 .data = endpoint_list,
2095 .data_size = 0,
2096 };
2097
2098 if (as_hb_plugin_data_get(nodeid, AS_HB_PLUGIN_FABRIC, &plugin_data, NULL,
2099 NULL) == 0) {
2100 return plugin_data.data_size != 0;
2101 }
2102
2103 if (errno == ENOENT) {
2104 return false;
2105 }
2106
2107 // Not enough allocated memory.
2108 *endpoint_list_size = plugin_data.data_size;
2109
2110 return false;
2111}
2112
2113// Filter out endpoints not matching this node's capabilities.
2114static bool
2115fabric_connect_endpoint_filter(const as_endpoint *endpoint, void *udata)
2116{
2117 if (cf_ip_addr_legacy_only() &&
2118 endpoint->addr_type == AS_ENDPOINT_ADDR_TYPE_IPv6) {
2119 return false;
2120 }
2121
2122 // If we don't offer TLS, then we won't connect via TLS, either.
2123 if (g_config.tls_fabric.bind_port == 0 &&
2124 as_endpoint_capability_is_supported(endpoint,
2125 AS_ENDPOINT_TLS_MASK)) {
2126 return false;
2127 }
2128
2129 return true;
2130}
2131
2132
2133//==========================================================
2134// Thread functions.
2135//
2136
2137static void *
2138run_fabric_recv(void *arg)
2139{
2140 cf_thread_disable_cancel();
2141
2142 fabric_recv_thread_pool *pool = (fabric_recv_thread_pool *)arg;
2143 static int worker_id_counter = 0;
2144 uint64_t worker_id = worker_id_counter++;
2145 cf_poll poll = pool->poll;
2146
2147 cf_detail(AS_FABRIC, "run_fabric_recv() created index %lu", worker_id);
2148
2149 while (true) {
2150 cf_thread_test_cancel();
2151
2152 cf_poll_event events[FABRIC_EPOLL_RECV_EVENTS];
2153 int32_t n = cf_poll_wait(poll, events, FABRIC_EPOLL_RECV_EVENTS, -1);
2154
2155 for (int32_t i = 0; i < n; i++) {
2156 fabric_connection *fc = events[i].data;
2157
2158 if (fc->node && ! fc->node->live) {
2159 fabric_connection_disconnect(fc);
2160 fabric_connection_release(fc);
2161 continue;
2162 }
2163
2164 // Handle remote close, socket errors.
2165 // Also triggered by call to cf_socket_shutdown(fc->sock), but only
2166 // first call.
2167 // Not triggered by cf_socket_close(fc->sock), which automatically
2168 // does EPOLL_CTL_DEL.
2169 if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
2170 cf_detail(AS_FABRIC, "%lu: epoll : error, will close: fc %p fd %d errno %d signal {err:%d, hup:%d, rdhup:%d}",
2171 worker_id,
2172 fc, CSFD(&fc->sock), errno,
2173 ((events[i].events & EPOLLERR) ? 1 : 0),
2174 ((events[i].events & EPOLLHUP) ? 1 : 0),
2175 ((events[i].events & EPOLLRDHUP) ? 1 : 0));
2176 fabric_connection_disconnect(fc);
2177 fabric_connection_release(fc);
2178 continue;
2179 }
2180
2181 cf_assert(events[i].events == EPOLLIN, AS_FABRIC, "epoll not setup correctly for %p", fc);
2182 uint32_t rearm_count = fc->r_rearm_count;
2183
2184 if (! fabric_connection_process_readable(fc)) {
2185 fabric_connection_disconnect(fc);
2186
2187 if (rearm_count == fc->r_rearm_count) {
2188 fabric_connection_release(fc);
2189 }
2190
2191 continue;
2192 }
2193 }
2194 }
2195
2196 return NULL;
2197}
2198
2199static void *
2200run_fabric_send(void *arg)
2201{
2202 send_entry *se = (send_entry *)arg;
2203 cf_poll poll = se->poll;
2204
2205 cf_detail(AS_FABRIC, "run_fabric_send() fd %d id %u", poll.fd, se->id);
2206
2207 while (true) {
2208 cf_poll_event events[FABRIC_EPOLL_SEND_EVENTS];
2209 int32_t n = cf_poll_wait(poll, events, FABRIC_EPOLL_SEND_EVENTS, -1);
2210
2211 for (int32_t i = 0; i < n; i++) {
2212 fabric_connection *fc = events[i].data;
2213
2214 if (fc->node && ! fc->node->live) {
2215 fabric_connection_disconnect(fc);
2216 fabric_connection_send_unassign(fc);
2217 fabric_connection_release(fc);
2218 continue;
2219 }
2220
2221 // Handle remote close, socket errors. Also triggered by call to
2222 // cf_socket_shutdown(fb->sock), but only first call. Not triggered
2223 // by cf_socket_close(fb->sock), which automatically EPOLL_CTL_DEL.
2224 if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
2225 cf_detail(AS_FABRIC, "epoll : error, will close: fc %p fd %d errno %d signal {err:%d, hup:%d, rdhup:%d}",
2226 fc, CSFD(&fc->sock), errno,
2227 ((events[i].events & EPOLLERR) ? 1 : 0),
2228 ((events[i].events & EPOLLHUP) ? 1 : 0),
2229 ((events[i].events & EPOLLRDHUP) ? 1 : 0));
2230 fabric_connection_disconnect(fc);
2231 fabric_connection_send_unassign(fc);
2232 fabric_connection_reroute_msg(fc);
2233 fabric_connection_release(fc);
2234 continue;
2235 }
2236
2237 if (tls_socket_needs_handshake(&fc->sock)) {
2238 if (! fabric_connection_connect_tls(fc)) {
2239 fabric_connection_disconnect(fc);
2240 fabric_connection_send_unassign(fc);
2241 fabric_connection_reroute_msg(fc);
2242 fabric_connection_release(fc);
2243 }
2244
2245 continue;
2246 }
2247
2248 cf_assert(events[i].events == EPOLLOUT, AS_FABRIC, "epoll not setup correctly for %p", fc);
2249
2250 if (! fabric_connection_process_writable(fc)) {
2251 fabric_connection_disconnect(fc);
2252 fabric_connection_send_unassign(fc);
2253 fabric_connection_reroute_msg(fc);
2254 fabric_connection_release(fc);
2255 continue;
2256 }
2257 }
2258 }
2259
2260 return 0;
2261}
2262
2263static void *
2264run_fabric_accept(void *arg)
2265{
2266 cf_sockets sockset;
2267
2268 if (cf_socket_init_server(&g_fabric_bind, &sockset) < 0) {
2269 cf_crash(AS_FABRIC, "Could not create fabric listener socket - check configuration");
2270 }
2271
2272 cf_poll_create(&g_accept_poll);
2273 cf_poll_add_sockets(g_accept_poll, &sockset, EPOLLIN | EPOLLERR | EPOLLHUP);
2274 cf_socket_show_server(AS_FABRIC, "fabric", &sockset);
2275
2276 while (true) {
2277 // Accept new connections on the service socket.
2278 cf_poll_event events[64];
2279 int32_t n = cf_poll_wait(g_accept_poll, events, 64, -1);
2280
2281 for (int32_t i = 0; i < n; i++) {
2282 cf_socket *ssock = events[i].data;
2283
2284 if (cf_sockets_has_socket(&sockset, ssock)) {
2285 cf_socket csock;
2286 cf_sock_addr sa;
2287
2288 if (cf_socket_accept(ssock, &csock, &sa) < 0) {
2289 if (errno == EMFILE) {
2290 cf_warning(AS_FABRIC, "low on file descriptors");
2291 continue;
2292 }
2293 else {
2294 cf_crash(AS_FABRIC, "cf_socket_accept: %d %s", errno, cf_strerror(errno));
2295 }
2296 }
2297
2298 cf_detail(AS_FABRIC, "fabric_accept: accepting new sock %d", CSFD(&csock));
2299 cf_atomic64_incr(&g_stats.fabric_connections_opened);
2300
2301 fabric_connection *fc = fabric_connection_create(&csock, &sa);
2302
2303 cf_sock_cfg *cfg = ssock->cfg;
2304
2305 if (cfg->owner == CF_SOCK_OWNER_FABRIC_TLS) {
2306 tls_socket_prepare_server(g_fabric_tls, &fc->sock);
2307 }
2308
2309 uint32_t events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
2310 cf_poll_add_socket(g_accept_poll, &fc->sock, events, fc);
2311 }
2312 else {
2313 fabric_connection *fc = events[i].data;
2314
2315 if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
2316 fabric_connection_release(fc);
2317 continue;
2318 }
2319
2320 if (tls_socket_needs_handshake(&fc->sock)) {
2321 if (! fabric_connection_accept_tls(fc)) {
2322 fabric_connection_release(fc);
2323 }
2324
2325 continue;
2326 }
2327
2328 if (! fabric_connection_read_fabric_msg(fc)) {
2329 fabric_connection_release(fc);
2330 continue;
2331 }
2332 }
2333 }
2334 }
2335
2336 return 0;
2337}
2338
2339static int
2340fabric_rate_node_reduce_fn(const void *key, uint32_t keylen, void *data,
2341 void *udata)
2342{
2343 fabric_node *node = (fabric_node *)data;
2344 fabric_rate *rate = (fabric_rate *)udata;
2345
2346 cf_mutex_lock(&node->fc_hash_lock);
2347 cf_shash_reduce(node->fc_hash, fabric_rate_fc_reduce_fn, rate);
2348 cf_mutex_unlock(&node->fc_hash_lock);
2349
2350 return 0;
2351}
2352
2353static int
2354fabric_rate_fc_reduce_fn(const void *key, void *data, void *udata)
2355{
2356 fabric_connection *fc = *(fabric_connection **)key;
2357 fabric_rate *rate = (fabric_rate *)udata;
2358
2359 if (! fc->pool) {
2360 return 0;
2361 }
2362
2363 uint32_t pool_id = fc->pool->pool_id;
2364 uint64_t r_bytes = fc->r_bytes;
2365 uint64_t s_bytes = fc->s_bytes;
2366
2367 rate->r_bytes[pool_id] += r_bytes - fc->r_bytes_last;
2368 rate->s_bytes[pool_id] += s_bytes - fc->s_bytes_last;
2369
2370 fc->r_bytes_last = r_bytes;
2371 fc->s_bytes_last = s_bytes;
2372
2373 return 0;
2374}
2375
2376
2377//==========================================================
2378// Heartbeat.
2379//
2380
2381// Set the fabric advertised endpoints.
2382static void
2383fabric_hb_plugin_set_fn(msg *m)
2384{
2385 if (m->type == M_TYPE_HEARTBEAT_V2) {
2386 // In v1 and v2 fabric does not advertise its endpoints and they
2387 // do not support plugged in data.
2388 return;
2389 }
2390
2391 if (! fabric_published_endpoints_refresh()) {
2392 cf_warning(AS_FABRIC, "No publish addresses found for fabric.");
2393 return;
2394 }
2395
2396 size_t payload_size = 0;
2397
2398 if (as_endpoint_list_sizeof(
2399 g_published_endpoint_list, &payload_size) != 0) {
2400 cf_crash(AS_FABRIC, "Error getting endpoint list size for published addresses.");
2401 }
2402
2403 msg_set_buf(m, AS_HB_MSG_FABRIC_DATA, (uint8_t *)g_published_endpoint_list,
2404 payload_size, MSG_SET_COPY);
2405}
2406
2407// Plugin function that parses succession list out of a heartbeat pulse message.
2408static void
2409fabric_hb_plugin_parse_data_fn(msg *m, cf_node source,
2410 as_hb_plugin_node_data *prev_plugin_data,
2411 as_hb_plugin_node_data *plugin_data)
2412{
2413 if (m->type == M_TYPE_HEARTBEAT_V2) {
2414 plugin_data->data_size = 0;
2415 return;
2416 }
2417
2418 uint8_t *payload = NULL;
2419 size_t payload_size = 0;
2420
2421 if (msg_get_buf(m, AS_HB_MSG_FABRIC_DATA, &payload, &payload_size,
2422 MSG_GET_DIRECT) != 0) {
2423 cf_warning(AS_FABRIC, "Unable to read fabric published endpoint list from heartbeat from node %lx", source);
2424 return;
2425 }
2426
2427 if (payload_size > plugin_data->data_capacity) {
2428 // Round up to nearest multiple of block size to prevent very frequent
2429 // reallocation.
2430 size_t data_capacity = ((payload_size + HB_PLUGIN_DATA_BLOCK_SIZE - 1) /
2431 HB_PLUGIN_DATA_BLOCK_SIZE) * HB_PLUGIN_DATA_BLOCK_SIZE;
2432
2433 // Reallocate since we have outgrown existing capacity.
2434 plugin_data->data = cf_realloc(plugin_data->data, data_capacity);
2435
2436 plugin_data->data_capacity = data_capacity;
2437 }
2438
2439 plugin_data->data_size = payload_size;
2440
2441 memcpy(plugin_data->data, payload, payload_size);
2442}
2443
2444// Function is called when a new node created or destroyed on the heartbeat
2445// system.
2446// This will insert a new element in the hashtable that keeps track of all TCP
2447// connections.
2448static void
2449fabric_heartbeat_event(int nevents, as_hb_event_node *events, void *udata)
2450{
2451 if ((nevents < 1) || (nevents > AS_CLUSTER_SZ) || ! events) {
2452 cf_warning(AS_FABRIC, "fabric: received event count of %d", nevents);
2453 return;
2454 }
2455
2456 for (int i = 0; i < nevents; i++) {
2457 switch (events[i].evt) {
2458 case AS_HB_NODE_ARRIVE: {
2459 fabric_node *node = fabric_node_get_or_create(events[i].nodeid);
2460 fabric_node_release(node); // for node_get_or_create()
2461
2462 cf_info(AS_FABRIC, "fabric: node %lx arrived", events[i].nodeid);
2463 }
2464 break;
2465 case AS_HB_NODE_DEPART:
2466 cf_info(AS_FABRIC, "fabric: node %lx departed", events[i].nodeid);
2467 fabric_node_disconnect(events[i].nodeid);
2468 break;
2469 case AS_HB_NODE_ADJACENCY_CHANGED:
2470 // Not relevant to fabric.
2471 break;
2472 default:
2473 cf_warning(AS_FABRIC, "fabric: received unknown event type %d %lx", events[i].evt, events[i].nodeid);
2474 break;
2475 }
2476 }
2477}
2478