1 | /* |
2 | * fabric.c |
3 | * |
4 | * Copyright (C) 2008-2018 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | // Object Management: |
24 | // ------------------ |
25 | // |
26 | // Node and FC objects are reference counted. Correct book keeping on object |
27 | // references are vital to system operations. |
28 | // |
29 | // Holders of FC references: |
30 | // (1) node->fc_hash |
31 | // (2) node->send_idle_fc_queue |
32 | // (3) (epoll_event ev).data.ptr |
33 | // |
34 | // For sending, (2) and (3) are mutually exclusive. |
35 | // Refs between (2) and (3) are passed virtually whenever possible, without |
36 | // needing to explicitly call reserve/release. |
37 | // (3) takes ref on rearm. |
38 | // (3) gives ref to calling thread when epoll triggers, due to ONESHOT. |
39 | // Thread will either rearm or give ref to (2). Never do both. |
40 | // |
41 | // FCs are created in two methods: fabric_node_connect(), run_fabric_accept() |
42 | // |
43 | // Holders of Node references: |
44 | // * fc->node |
45 | // * g_fabric.node_hash |
46 | |
47 | |
48 | //========================================================== |
49 | // Includes. |
50 | // |
51 | |
52 | #include "fabric/fabric.h" |
53 | |
54 | #include <errno.h> |
55 | #include <stdbool.h> |
56 | #include <stddef.h> |
57 | #include <stdint.h> |
58 | #include <string.h> |
59 | #include <unistd.h> |
60 | |
61 | #include "citrusleaf/alloc.h" |
62 | #include "citrusleaf/cf_atomic.h" |
63 | #include "citrusleaf/cf_byte_order.h" |
64 | #include "citrusleaf/cf_clock.h" |
65 | #include "citrusleaf/cf_queue.h" |
66 | #include "citrusleaf/cf_vector.h" |
67 | |
68 | #include "cf_mutex.h" |
69 | #include "cf_thread.h" |
70 | #include "fault.h" |
71 | #include "msg.h" |
72 | #include "node.h" |
73 | #include "rchash.h" |
74 | #include "shash.h" |
75 | #include "socket.h" |
76 | #include "tls.h" |
77 | |
78 | #include "base/cfg.h" |
79 | #include "base/health.h" |
80 | #include "base/stats.h" |
81 | #include "fabric/endpoint.h" |
82 | #include "fabric/hb.h" |
83 | |
84 | |
85 | //========================================================== |
86 | // Typedefs & constants. |
87 | // |
88 | |
89 | #define FABRIC_SEND_MEM_SZ (1024) // bytes |
90 | #define FABRIC_BUFFER_MEM_SZ (1024 * 1024) // bytes |
91 | #define FABRIC_BUFFER_MAX_SZ (128 * 1024 * 1024) // used simply for validation |
92 | #define FABRIC_EPOLL_SEND_EVENTS 16 |
93 | #define FABRIC_EPOLL_RECV_EVENTS 1 |
94 | |
95 | typedef enum { |
96 | // These values go on the wire, so mind backward compatibility if changing. |
97 | FS_FIELD_NODE, |
98 | FS_UNUSED1, // used to be FS_ADDR |
99 | FS_UNUSED2, // used to be FS_PORT |
100 | FS_UNUSED3, // used to be FS_ANV |
101 | FS_UNUSED4, // used to be FS_ADDR_EX |
102 | FS_CHANNEL, |
103 | |
104 | NUM_FS_FIELDS |
105 | } fs_msg_fields; |
106 | |
107 | static const msg_template fabric_mt[] = { |
108 | { FS_FIELD_NODE, M_FT_UINT64 }, |
109 | { FS_UNUSED1, M_FT_UINT32 }, |
110 | { FS_UNUSED2, M_FT_UINT32 }, |
111 | { FS_UNUSED3, M_FT_BUF }, |
112 | { FS_UNUSED4, M_FT_BUF }, |
113 | { FS_CHANNEL, M_FT_UINT32 }, |
114 | }; |
115 | |
116 | COMPILER_ASSERT(sizeof(fabric_mt) / sizeof(msg_template) == NUM_FS_FIELDS); |
117 | |
118 | #define FS_MSG_SCRATCH_SIZE 0 |
119 | |
120 | #define DEFAULT_EVENTS (EPOLLERR | EPOLLHUP | EPOLLRDHUP | EPOLLONESHOT) |
121 | |
122 | // Block size for allocating fabric hb plugin data. |
123 | #define HB_PLUGIN_DATA_BLOCK_SIZE 128 |
124 | |
125 | typedef struct fabric_recv_thread_pool_s { |
126 | cf_vector threads; |
127 | cf_poll poll; |
128 | uint32_t pool_id; |
129 | } fabric_recv_thread_pool; |
130 | |
131 | typedef struct send_entry_s { |
132 | struct send_entry_s *next; |
133 | uint32_t id; |
134 | uint32_t count; |
135 | cf_poll poll; |
136 | } send_entry; |
137 | |
138 | typedef struct fabric_state_s { |
139 | as_fabric_msg_fn msg_cb[M_TYPE_MAX]; |
140 | void *msg_udata[M_TYPE_MAX]; |
141 | |
142 | fabric_recv_thread_pool recv_pool[AS_FABRIC_N_CHANNELS]; |
143 | |
144 | cf_mutex send_lock; |
145 | send_entry *sends; |
146 | send_entry *send_head; |
147 | |
148 | cf_mutex node_hash_lock; |
149 | cf_rchash *node_hash; // key is cf_node, value is (fabric_node *) |
150 | } fabric_state; |
151 | |
152 | typedef struct fabric_node_s { |
153 | cf_node node_id; // remote node |
154 | bool live; // set to false on shutdown |
155 | uint32_t connect_count[AS_FABRIC_N_CHANNELS]; |
156 | bool connect_full; |
157 | |
158 | cf_mutex connect_lock; |
159 | |
160 | cf_mutex fc_hash_lock; |
161 | cf_shash *fc_hash; // key is (fabric_connection *), value unused |
162 | |
163 | cf_mutex send_idle_fc_queue_lock; |
164 | cf_queue send_idle_fc_queue[AS_FABRIC_N_CHANNELS]; |
165 | |
166 | cf_queue send_queue[AS_FABRIC_N_CHANNELS]; |
167 | |
168 | uint8_t send_counts[]; |
169 | } fabric_node; |
170 | |
171 | typedef struct fabric_connection_s { |
172 | cf_socket sock; |
173 | cf_sock_addr peer; |
174 | fabric_node *node; |
175 | |
176 | bool failed; |
177 | bool started_via_connect; |
178 | |
179 | bool s_cork_bypass; |
180 | int s_cork; |
181 | uint8_t s_buf[FABRIC_SEND_MEM_SZ]; |
182 | struct iovec *s_iov; |
183 | size_t s_iov_count; |
184 | uint32_t s_sz; |
185 | uint32_t s_msg_sz; |
186 | msg *s_msg_in_progress; |
187 | size_t s_count; |
188 | |
189 | uint8_t *r_bigbuf; |
190 | uint8_t r_buf[FABRIC_BUFFER_MEM_SZ + sizeof(msg_hdr)]; |
191 | uint32_t r_rearm_count; |
192 | msg_type r_type; |
193 | uint32_t r_sz; |
194 | uint32_t r_buf_sz; |
195 | uint64_t benchmark_time; |
196 | |
197 | // The send_ptr != NULL means that the FC's sock has registered with |
198 | // send_poll. This is needed because epoll's API doesn't allow registering |
199 | // a socket without event triggers (ERR and HUP are enabled even when |
200 | // unspecified). |
201 | send_entry *send_ptr; |
202 | fabric_recv_thread_pool *pool; |
203 | |
204 | uint64_t s_bytes; |
205 | uint64_t s_bytes_last; |
206 | uint64_t r_bytes; |
207 | uint64_t r_bytes_last; |
208 | } fabric_connection; |
209 | |
210 | typedef struct node_list_s { |
211 | uint32_t count; |
212 | cf_node nodes[AS_CLUSTER_SZ]; // must support the maximum cluster size. |
213 | } node_list; |
214 | |
215 | const char *CHANNEL_NAMES[] = { |
216 | [AS_FABRIC_CHANNEL_RW] = "rw" , |
217 | [AS_FABRIC_CHANNEL_CTRL] = "ctrl" , |
218 | [AS_FABRIC_CHANNEL_BULK] = "bulk" , |
219 | [AS_FABRIC_CHANNEL_META] = "meta" , |
220 | }; |
221 | |
222 | COMPILER_ASSERT(sizeof(CHANNEL_NAMES) / sizeof(const char *) == |
223 | AS_FABRIC_N_CHANNELS); |
224 | |
225 | const bool channel_nagle[] = { |
226 | [AS_FABRIC_CHANNEL_RW] = false, |
227 | [AS_FABRIC_CHANNEL_CTRL] = false, |
228 | [AS_FABRIC_CHANNEL_BULK] = true, |
229 | [AS_FABRIC_CHANNEL_META] = false, |
230 | }; |
231 | |
232 | COMPILER_ASSERT(sizeof(channel_nagle) / sizeof(bool) == AS_FABRIC_N_CHANNELS); |
233 | |
234 | |
235 | //========================================================== |
236 | // Globals. |
237 | // |
238 | |
239 | cf_serv_cfg g_fabric_bind = { .n_cfgs = 0 }; |
240 | cf_tls_info *g_fabric_tls; |
241 | |
242 | static fabric_state g_fabric; |
243 | static cf_poll g_accept_poll; |
244 | |
245 | static as_endpoint_list *g_published_endpoint_list; |
246 | static bool g_published_endpoint_list_ipv4_only; |
247 | |
248 | // Max connections formed via connect. Others are formed via accept. |
249 | static uint32_t g_fabric_connect_limit[AS_FABRIC_N_CHANNELS]; |
250 | |
251 | |
252 | //========================================================== |
253 | // Forward declarations. |
254 | // |
255 | |
256 | // Support functions. |
257 | static void send_entry_insert(send_entry **se_pp, send_entry *se); |
258 | |
259 | static void fabric_published_serv_cfg_fill(const cf_serv_cfg *bind_cfg, cf_serv_cfg *published_cfg, bool ipv4_only); |
260 | static bool fabric_published_endpoints_refresh(void); |
261 | |
262 | // fabric_node |
263 | static fabric_node *fabric_node_create(cf_node node_id); |
264 | static fabric_node *fabric_node_get(cf_node node_id); |
265 | static fabric_node *fabric_node_get_or_create(cf_node node_id); |
266 | static fabric_node *fabric_node_pop(cf_node node_id); |
267 | static int fabric_node_disconnect_reduce_fn(const void *key, void *data, void *udata); |
268 | static void fabric_node_disconnect(cf_node node_id); |
269 | |
270 | static fabric_connection *fabric_node_connect(fabric_node *node, uint32_t ch); |
271 | static int fabric_node_send(fabric_node *node, msg *m, as_fabric_channel channel); |
272 | static void fabric_node_connect_all(fabric_node *node); |
273 | static void fabric_node_destructor(void *pnode); |
274 | inline static void fabric_node_reserve(fabric_node *node); |
275 | inline static void fabric_node_release(fabric_node *node); |
276 | static bool fabric_node_add_connection(fabric_node *node, fabric_connection *fc); |
277 | static uint8_t fabric_node_find_min_send_count(const fabric_node *node); |
278 | static bool fabric_node_is_connect_full(const fabric_node *node); |
279 | |
280 | static int fabric_get_node_list_fn(const void *key, uint32_t keylen, void *data, void *udata); |
281 | static uint32_t fabric_get_node_list(node_list *nl); |
282 | |
283 | // fabric_connection |
284 | fabric_connection *fabric_connection_create(cf_socket *sock, cf_sock_addr *peer); |
285 | static bool fabric_connection_accept_tls(fabric_connection *fc); |
286 | static bool fabric_connection_connect_tls(fabric_connection *fc); |
287 | inline static void fabric_connection_reserve(fabric_connection *fc); |
288 | static void fabric_connection_release(fabric_connection *fc); |
289 | inline static cf_node fabric_connection_get_id(const fabric_connection *fc); |
290 | |
291 | inline static void fabric_connection_cork(fabric_connection *fc); |
292 | inline static void fabric_connection_uncork(fabric_connection *fc); |
293 | static void fabric_connection_send_assign(fabric_connection *fc); |
294 | static void fabric_connection_send_unassign(fabric_connection *fc); |
295 | inline static void fabric_connection_recv_rearm(fabric_connection *fc); |
296 | inline static void fabric_connection_send_rearm(fabric_connection *fc); |
297 | static void fabric_connection_disconnect(fabric_connection *fc); |
298 | static void fabric_connection_set_keepalive_options(fabric_connection *fc); |
299 | |
300 | static void fabric_connection_reroute_msg(fabric_connection *fc); |
301 | static bool fabric_connection_send_progress(fabric_connection *fc); |
302 | static bool fabric_connection_process_writable(fabric_connection *fc); |
303 | |
304 | static bool fabric_connection_process_fabric_msg(fabric_connection *fc, const msg *m); |
305 | static bool fabric_connection_read_fabric_msg(fabric_connection *fc); |
306 | |
307 | static bool fabric_connection_process_msg(fabric_connection *fc, bool do_rearm); |
308 | static bool fabric_connection_process_readable(fabric_connection *fc); |
309 | |
310 | // fabric_recv_thread_pool |
311 | static void fabric_recv_thread_pool_init(fabric_recv_thread_pool *pool, uint32_t size, uint32_t pool_id); |
312 | static void fabric_recv_thread_pool_set_size(fabric_recv_thread_pool *pool, uint32_t size); |
313 | static void fabric_recv_thread_pool_add_fc(fabric_recv_thread_pool *pool, fabric_connection *fc); |
314 | |
315 | // fabric_endpoint |
316 | static bool fabric_endpoint_list_get(cf_node nodeid, as_endpoint_list *endpoint_list, size_t *endpoint_list_size); |
317 | static bool fabric_connect_endpoint_filter(const as_endpoint *endpoint, void *udata); |
318 | |
319 | // Thread functions. |
320 | static void *run_fabric_recv(void *arg); |
321 | static void *run_fabric_send(void *arg); |
322 | static void *run_fabric_accept(void *arg); |
323 | |
324 | // Ticker helpers. |
325 | static int fabric_rate_node_reduce_fn(const void *key, uint32_t keylen, void *data, void *udata); |
326 | static int fabric_rate_fc_reduce_fn(const void *key, void *data, void *udata); |
327 | |
328 | // Heartbeat. |
329 | static void fabric_hb_plugin_set_fn(msg *m); |
330 | static void fabric_hb_plugin_parse_data_fn(msg *m, cf_node source, as_hb_plugin_node_data *prev_plugin_data, as_hb_plugin_node_data *plugin_data); |
331 | static void fabric_heartbeat_event(int nevents, as_hb_event_node *events, void *udata); |
332 | |
333 | |
334 | //========================================================== |
335 | // Public API. |
336 | // |
337 | |
338 | //------------------------------------------------ |
339 | // msg |
340 | // |
341 | |
342 | // Log information about existing "msg" objects and queues. |
343 | void |
344 | as_fabric_msg_queue_dump() |
345 | { |
346 | cf_info(AS_FABRIC, "All currently-existing msg types:" ); |
347 | |
348 | int total_q_sz = 0; |
349 | int total_alloced_msgs = 0; |
350 | |
351 | for (int i = 0; i < M_TYPE_MAX; i++) { |
352 | int num_of_type = cf_atomic_int_get(g_num_msgs_by_type[i]); |
353 | |
354 | total_alloced_msgs += num_of_type; |
355 | |
356 | if (num_of_type) { |
357 | cf_info(AS_FABRIC, "alloc'd = %d" , num_of_type); |
358 | } |
359 | } |
360 | |
361 | int num_msgs = cf_atomic_int_get(g_num_msgs); |
362 | |
363 | if (abs(num_msgs - total_alloced_msgs) > 2) { |
364 | cf_warning(AS_FABRIC, "num msgs (%d) != total alloc'd msgs (%d)" , num_msgs, total_alloced_msgs); |
365 | } |
366 | |
367 | cf_info(AS_FABRIC, "Total num. msgs = %d ; Total num. queued = %d ; Delta = %d" , num_msgs, total_q_sz, num_msgs - total_q_sz); |
368 | } |
369 | |
370 | //------------------------------------------------ |
371 | // as_fabric |
372 | // |
373 | |
374 | void |
375 | as_fabric_init() |
376 | { |
377 | for (uint32_t i = 0; i < AS_FABRIC_N_CHANNELS; i++) { |
378 | g_fabric_connect_limit[i] = g_config.n_fabric_channel_fds[i]; |
379 | |
380 | fabric_recv_thread_pool_init(&g_fabric.recv_pool[i], |
381 | g_config.n_fabric_channel_recv_threads[i], i); |
382 | } |
383 | |
384 | cf_mutex_init(&g_fabric.send_lock); |
385 | |
386 | as_fabric_register_msg_fn(M_TYPE_FABRIC, fabric_mt, sizeof(fabric_mt), |
387 | FS_MSG_SCRATCH_SIZE, NULL, NULL); |
388 | |
389 | cf_mutex_init(&g_fabric.node_hash_lock); |
390 | |
391 | g_fabric.node_hash = cf_rchash_create(cf_nodeid_rchash_fn, |
392 | fabric_node_destructor, sizeof(cf_node), 128, 0); |
393 | |
394 | g_published_endpoint_list = NULL; |
395 | g_published_endpoint_list_ipv4_only = cf_ip_addr_legacy_only(); |
396 | |
397 | if (! fabric_published_endpoints_refresh()) { |
398 | cf_crash(AS_FABRIC, "error creating fabric published endpoint list" ); |
399 | } |
400 | |
401 | as_hb_plugin fabric_plugin; |
402 | |
403 | memset(&fabric_plugin, 0, sizeof(fabric_plugin)); |
404 | fabric_plugin.id = AS_HB_PLUGIN_FABRIC; |
405 | fabric_plugin.wire_size_fixed = 0; // includes the size for the protocol version |
406 | as_endpoint_list_sizeof(g_published_endpoint_list, |
407 | &fabric_plugin.wire_size_fixed); |
408 | fabric_plugin.wire_size_per_node = 0; // size per node node in succession list |
409 | fabric_plugin.set_fn = fabric_hb_plugin_set_fn; |
410 | fabric_plugin.parse_fn = fabric_hb_plugin_parse_data_fn; |
411 | fabric_plugin.change_listener = NULL; |
412 | as_hb_plugin_register(&fabric_plugin); |
413 | |
414 | as_hb_register_listener(fabric_heartbeat_event, &g_fabric); |
415 | } |
416 | |
417 | void |
418 | as_fabric_start() |
419 | { |
420 | g_fabric.sends = |
421 | cf_malloc(sizeof(send_entry) * g_config.n_fabric_send_threads); |
422 | g_fabric.send_head = g_fabric.sends; |
423 | |
424 | cf_info(AS_FABRIC, "starting %u fabric send threads" , g_config.n_fabric_send_threads); |
425 | |
426 | for (int i = 0; i < g_config.n_fabric_send_threads; i++) { |
427 | cf_poll_create(&g_fabric.sends[i].poll); |
428 | g_fabric.sends[i].id = i; |
429 | g_fabric.sends[i].count = 0; |
430 | g_fabric.sends[i].next = g_fabric.sends + i + 1; |
431 | |
432 | cf_thread_create_detached(run_fabric_send, (void*)&g_fabric.sends[i]); |
433 | } |
434 | |
435 | g_fabric.sends[g_config.n_fabric_send_threads - 1].next = NULL; |
436 | |
437 | for (uint32_t i = 0; i < AS_FABRIC_N_CHANNELS; i++) { |
438 | cf_info(AS_FABRIC, "starting %u fabric %s channel recv threads" , g_config.n_fabric_channel_recv_threads[i], CHANNEL_NAMES[i]); |
439 | |
440 | fabric_recv_thread_pool_set_size(&g_fabric.recv_pool[i], |
441 | g_config.n_fabric_channel_recv_threads[i]); |
442 | } |
443 | |
444 | cf_info(AS_FABRIC, "starting fabric accept thread" ); |
445 | |
446 | cf_thread_create_detached(run_fabric_accept, NULL); |
447 | } |
448 | |
449 | void |
450 | as_fabric_set_recv_threads(as_fabric_channel channel, uint32_t count) |
451 | { |
452 | g_config.n_fabric_channel_recv_threads[channel] = count; |
453 | |
454 | fabric_recv_thread_pool_set_size(&g_fabric.recv_pool[channel], count); |
455 | } |
456 | |
457 | int |
458 | as_fabric_send(cf_node node_id, msg *m, as_fabric_channel channel) |
459 | { |
460 | m->benchmark_time = g_config.fabric_benchmarks_enabled ? cf_getns() : 0; |
461 | |
462 | if (g_config.self_node == node_id) { |
463 | cf_assert(g_fabric.msg_cb[m->type], AS_FABRIC, "m->type %d not registered" , m->type); |
464 | (g_fabric.msg_cb[m->type])(node_id, m, g_fabric.msg_udata[m->type]); |
465 | |
466 | return AS_FABRIC_SUCCESS; |
467 | } |
468 | |
469 | fabric_node *node = fabric_node_get(node_id); |
470 | int ret = fabric_node_send(node, m, channel); |
471 | |
472 | if (node) { |
473 | fabric_node_release(node); // from fabric_node_get |
474 | } |
475 | |
476 | return ret; |
477 | } |
478 | |
479 | int |
480 | as_fabric_send_list(const cf_node *nodes, uint32_t node_count, msg *m, |
481 | as_fabric_channel channel) |
482 | { |
483 | cf_assert(nodes && node_count != 0, AS_FABRIC, "nodes list null or empty" ); |
484 | |
485 | // TODO - if we implement an out-of-scope response when sending to self, |
486 | // remove this deferral. |
487 | bool send_self = false; |
488 | |
489 | for (uint32_t i = 0; i < node_count; i++) { |
490 | if (nodes[i] == g_config.self_node) { |
491 | send_self = true; |
492 | continue; |
493 | } |
494 | |
495 | msg_incr_ref(m); |
496 | |
497 | int ret = as_fabric_send(nodes[i], m, channel); |
498 | |
499 | if (ret != AS_FABRIC_SUCCESS) { |
500 | as_fabric_msg_put(m); |
501 | return ret; // caller releases main reference on failure |
502 | } |
503 | } |
504 | |
505 | if (send_self) { |
506 | // Shortcut - use main reference for fabric. |
507 | return as_fabric_send(g_config.self_node, m, channel); |
508 | } |
509 | |
510 | as_fabric_msg_put(m); // release main reference |
511 | |
512 | return AS_FABRIC_SUCCESS; |
513 | } |
514 | |
515 | int |
516 | as_fabric_retransmit(cf_node node_id, msg *m, as_fabric_channel channel) |
517 | { |
518 | // This function assumes the sender holds only a single reference to the |
519 | // msg. Do not use this function when there may be more than one reference |
520 | // to an unsent msg. |
521 | |
522 | if (cf_rc_count(m) > 1) { |
523 | // Msg should already be in the fabric queue - success. |
524 | return AS_FABRIC_SUCCESS; |
525 | } |
526 | |
527 | msg_incr_ref(m); |
528 | |
529 | int err = as_fabric_send(node_id, m, channel); |
530 | |
531 | if (err != AS_FABRIC_SUCCESS) { |
532 | as_fabric_msg_put(m); |
533 | return err; |
534 | } |
535 | |
536 | return AS_FABRIC_SUCCESS; |
537 | } |
538 | |
539 | // TODO - make static registration |
540 | void |
541 | as_fabric_register_msg_fn(msg_type type, const msg_template *mt, size_t mt_sz, |
542 | size_t scratch_sz, as_fabric_msg_fn msg_cb, void *msg_udata) |
543 | { |
544 | msg_type_register(type, mt, mt_sz, scratch_sz); |
545 | |
546 | g_fabric.msg_cb[type] = msg_cb; |
547 | g_fabric.msg_udata[type] = msg_udata; |
548 | } |
549 | |
550 | void |
551 | as_fabric_info_peer_endpoints_get(cf_dyn_buf *db) |
552 | { |
553 | node_list nl; |
554 | fabric_get_node_list(&nl); |
555 | |
556 | for (uint32_t i = 0; i < nl.count; i++) { |
557 | if (nl.nodes[i] == g_config.self_node) { |
558 | continue; |
559 | } |
560 | |
561 | fabric_node *node = fabric_node_get(nl.nodes[i]); |
562 | |
563 | if (! node) { |
564 | cf_info(AS_FABRIC, "\tnode %lx not found in hash although reported available" , nl.nodes[i]); |
565 | continue; |
566 | } |
567 | |
568 | size_t endpoint_list_capacity = 1024; |
569 | bool retry = true; |
570 | |
571 | while (true) { |
572 | uint8_t stack_mem[endpoint_list_capacity]; |
573 | as_endpoint_list *endpoint_list = (as_endpoint_list *)stack_mem; |
574 | |
575 | if (! fabric_endpoint_list_get(node->node_id, endpoint_list, |
576 | &endpoint_list_capacity)) { |
577 | if (errno == ENOENT) { |
578 | // No entry present for this node in heartbeat. |
579 | cf_detail(AS_FABRIC, "could not get endpoint list for %lx" , node->node_id); |
580 | break; |
581 | } |
582 | |
583 | if (! retry) { |
584 | break; |
585 | } |
586 | |
587 | retry = false; |
588 | continue; |
589 | } |
590 | |
591 | cf_dyn_buf_append_string(db, "fabric.peer=" ); |
592 | cf_dyn_buf_append_string(db, "node-id=" ); |
593 | cf_dyn_buf_append_uint64_x(db, node->node_id); |
594 | cf_dyn_buf_append_string(db, ":" ); |
595 | as_endpoint_list_info(endpoint_list, db); |
596 | cf_dyn_buf_append_string(db, ";" ); |
597 | break; |
598 | } |
599 | |
600 | fabric_node_release(node); |
601 | } |
602 | } |
603 | |
604 | bool |
605 | as_fabric_is_published_endpoint_list(const as_endpoint_list *list) |
606 | { |
607 | return as_endpoint_lists_are_equal(g_published_endpoint_list, list); |
608 | } |
609 | |
610 | // Used by heartbeat subsystem only, for duplicate node-id detection. |
611 | as_endpoint_list * |
612 | as_fabric_hb_plugin_get_endpoint_list(as_hb_plugin_node_data *plugin_data) |
613 | { |
614 | return (plugin_data && plugin_data->data_size != 0) ? |
615 | (as_endpoint_list *)plugin_data->data : NULL; |
616 | } |
617 | |
618 | void |
619 | as_fabric_rate_capture(fabric_rate *rate) |
620 | { |
621 | cf_mutex_lock(&g_fabric.node_hash_lock); |
622 | cf_rchash_reduce(g_fabric.node_hash, fabric_rate_node_reduce_fn, rate); |
623 | cf_mutex_unlock(&g_fabric.node_hash_lock); |
624 | } |
625 | |
626 | void |
627 | as_fabric_dump(bool verbose) |
628 | { |
629 | node_list nl; |
630 | fabric_get_node_list(&nl); |
631 | |
632 | cf_info(AS_FABRIC, " Fabric Dump: nodes known %d" , nl.count); |
633 | |
634 | for (uint32_t i = 0; i < nl.count; i++) { |
635 | if (nl.nodes[i] == g_config.self_node) { |
636 | cf_info(AS_FABRIC, "\tnode %lx is self" , nl.nodes[i]); |
637 | continue; |
638 | } |
639 | |
640 | fabric_node *node = fabric_node_get(nl.nodes[i]); |
641 | |
642 | if (! node) { |
643 | cf_info(AS_FABRIC, "\tnode %lx not found in hash although reported available" , nl.nodes[i]); |
644 | continue; |
645 | } |
646 | |
647 | cf_mutex_lock(&node->fc_hash_lock); |
648 | cf_info(AS_FABRIC, "\tnode %lx fds {via_connect={h=%d m=%d l=%d} all=%d} live %d q {h=%d m=%d l=%d}" , |
649 | node->node_id, |
650 | node->connect_count[AS_FABRIC_CHANNEL_CTRL], |
651 | node->connect_count[AS_FABRIC_CHANNEL_RW], |
652 | node->connect_count[AS_FABRIC_CHANNEL_BULK], |
653 | cf_shash_get_size(node->fc_hash), node->live, |
654 | cf_queue_sz(&node->send_queue[AS_FABRIC_CHANNEL_CTRL]), |
655 | cf_queue_sz(&node->send_queue[AS_FABRIC_CHANNEL_RW]), |
656 | cf_queue_sz(&node->send_queue[AS_FABRIC_CHANNEL_BULK])); |
657 | cf_mutex_unlock(&node->fc_hash_lock); |
658 | |
659 | fabric_node_release(node); // node_get |
660 | } |
661 | } |
662 | |
663 | |
664 | //========================================================== |
665 | // Support functions. |
666 | // |
667 | |
668 | static void |
669 | send_entry_insert(send_entry **se_pp, send_entry *se) |
670 | { |
671 | while (*se_pp && se->count > (*se_pp)->count) { |
672 | se_pp = &(*se_pp)->next; |
673 | } |
674 | |
675 | se->next = *se_pp; |
676 | *se_pp = se; |
677 | } |
678 | |
679 | // Get addresses to publish as serv config. Expand "any" addresses. |
680 | static void |
681 | fabric_published_serv_cfg_fill(const cf_serv_cfg *bind_cfg, |
682 | cf_serv_cfg *published_cfg, bool ipv4_only) |
683 | { |
684 | cf_serv_cfg_init(published_cfg); |
685 | |
686 | cf_sock_cfg sock_cfg; |
687 | |
688 | for (int i = 0; i < bind_cfg->n_cfgs; i++) { |
689 | cf_sock_cfg_copy(&bind_cfg->cfgs[i], &sock_cfg); |
690 | |
691 | // Expand "any" address to all interfaces. |
692 | if (cf_ip_addr_is_any(&sock_cfg.addr)) { |
693 | cf_ip_addr all_addrs[CF_SOCK_CFG_MAX]; |
694 | uint32_t n_all_addrs = CF_SOCK_CFG_MAX; |
695 | |
696 | if (cf_inter_get_addr_all(all_addrs, &n_all_addrs) != 0) { |
697 | cf_warning(AS_FABRIC, "error getting all interface addresses" ); |
698 | n_all_addrs = 0; |
699 | } |
700 | |
701 | for (int j = 0; j < n_all_addrs; j++) { |
702 | // Skip local address if any is specified. |
703 | if (cf_ip_addr_is_local(&all_addrs[j]) || |
704 | (ipv4_only && ! cf_ip_addr_is_legacy(&all_addrs[j]))) { |
705 | continue; |
706 | } |
707 | |
708 | cf_ip_addr_copy(&all_addrs[j], &sock_cfg.addr); |
709 | |
710 | if (cf_serv_cfg_add_sock_cfg(published_cfg, &sock_cfg)) { |
711 | cf_crash(AS_FABRIC, "error initializing published address list" ); |
712 | } |
713 | } |
714 | } |
715 | else { |
716 | if (ipv4_only && ! cf_ip_addr_is_legacy(&bind_cfg->cfgs[i].addr)) { |
717 | continue; |
718 | } |
719 | |
720 | if (cf_serv_cfg_add_sock_cfg(published_cfg, &sock_cfg)) { |
721 | cf_crash(AS_FABRIC, "error initializing published address list" ); |
722 | } |
723 | } |
724 | } |
725 | } |
726 | |
727 | // Refresh the fabric published endpoint list. |
728 | // Return true on success. |
729 | static bool |
730 | fabric_published_endpoints_refresh() |
731 | { |
732 | if (g_published_endpoint_list && |
733 | g_published_endpoint_list_ipv4_only == cf_ip_addr_legacy_only()) { |
734 | return true; |
735 | } |
736 | |
737 | // The global flag has changed, refresh the published address list. |
738 | if (g_published_endpoint_list) { |
739 | // Free the obsolete list. |
740 | cf_free(g_published_endpoint_list); |
741 | } |
742 | |
743 | cf_serv_cfg published_cfg; |
744 | fabric_published_serv_cfg_fill(&g_fabric_bind, &published_cfg, |
745 | g_published_endpoint_list_ipv4_only); |
746 | |
747 | g_published_endpoint_list = as_endpoint_list_from_serv_cfg(&published_cfg); |
748 | cf_assert(g_published_endpoint_list, AS_FABRIC, "error initializing mesh published address list" ); |
749 | |
750 | g_published_endpoint_list_ipv4_only = cf_ip_addr_legacy_only(); |
751 | |
752 | if (g_published_endpoint_list->n_endpoints == 0) { |
753 | if (g_published_endpoint_list_ipv4_only) { |
754 | cf_warning(AS_FABRIC, "no IPv4 addresses configured for fabric" ); |
755 | } |
756 | else { |
757 | cf_warning(AS_FABRIC, "no addresses configured for fabric" ); |
758 | } |
759 | |
760 | return false; |
761 | } |
762 | |
763 | char endpoint_list_str[512]; |
764 | as_endpoint_list_to_string(g_published_endpoint_list, endpoint_list_str, |
765 | sizeof(endpoint_list_str)); |
766 | |
767 | cf_info(AS_FABRIC, "updated fabric published address list to {%s}" , endpoint_list_str); |
768 | |
769 | return true; |
770 | } |
771 | |
772 | |
773 | //========================================================== |
774 | // fabric_node |
775 | // |
776 | |
777 | static fabric_node * |
778 | fabric_node_create(cf_node node_id) |
779 | { |
780 | size_t size = sizeof(fabric_node) + |
781 | (sizeof(uint8_t) * g_config.n_fabric_send_threads); |
782 | fabric_node *node = cf_rc_alloc(size); |
783 | |
784 | memset(node, 0, size); |
785 | |
786 | node->node_id = node_id; |
787 | node->live = true; |
788 | |
789 | cf_mutex_init(&node->send_idle_fc_queue_lock); |
790 | |
791 | for (int i = 0; i < AS_FABRIC_N_CHANNELS; i++) { |
792 | cf_queue_init(&node->send_idle_fc_queue[i], sizeof(fabric_connection *), |
793 | CF_QUEUE_ALLOCSZ, false); |
794 | |
795 | cf_queue_init(&node->send_queue[i], sizeof(msg *), CF_QUEUE_ALLOCSZ, |
796 | true); |
797 | } |
798 | |
799 | cf_mutex_init(&node->connect_lock); |
800 | cf_mutex_init(&node->fc_hash_lock); |
801 | |
802 | node->fc_hash = cf_shash_create(cf_shash_fn_ptr, |
803 | sizeof(fabric_connection *), 0, 32, 0); |
804 | |
805 | cf_detail(AS_FABRIC, "fabric_node_create(%lx) node %p" , node_id, node); |
806 | |
807 | return node; |
808 | } |
809 | |
810 | static fabric_node * |
811 | fabric_node_get(cf_node node_id) |
812 | { |
813 | fabric_node *node; |
814 | |
815 | cf_mutex_lock(&g_fabric.node_hash_lock); |
816 | int rv = cf_rchash_get(g_fabric.node_hash, &node_id, sizeof(cf_node), |
817 | (void **)&node); |
818 | cf_mutex_unlock(&g_fabric.node_hash_lock); |
819 | |
820 | if (rv != CF_RCHASH_OK) { |
821 | return NULL; |
822 | } |
823 | |
824 | return node; |
825 | } |
826 | |
827 | static fabric_node * |
828 | fabric_node_get_or_create(cf_node node_id) |
829 | { |
830 | fabric_node *node; |
831 | |
832 | cf_mutex_lock(&g_fabric.node_hash_lock); |
833 | |
834 | if (cf_rchash_get(g_fabric.node_hash, &node_id, sizeof(cf_node), |
835 | (void **)&node) == CF_RCHASH_OK) { |
836 | cf_mutex_unlock(&g_fabric.node_hash_lock); |
837 | |
838 | fabric_node_connect_all(node); |
839 | |
840 | return node; |
841 | } |
842 | |
843 | node = fabric_node_create(node_id); |
844 | |
845 | if (cf_rchash_put_unique(g_fabric.node_hash, &node_id, sizeof(cf_node), |
846 | node) != CF_RCHASH_OK) { |
847 | cf_crash(AS_FABRIC, "fabric_node_get_or_create(%lx)" , node_id); |
848 | } |
849 | |
850 | fabric_node_reserve(node); // for return |
851 | |
852 | cf_mutex_unlock(&g_fabric.node_hash_lock); |
853 | |
854 | fabric_node_connect_all(node); |
855 | |
856 | return node; |
857 | } |
858 | |
859 | static fabric_node * |
860 | fabric_node_pop(cf_node node_id) |
861 | { |
862 | fabric_node *node = NULL; |
863 | |
864 | cf_mutex_lock(&g_fabric.node_hash_lock); |
865 | |
866 | if (cf_rchash_get(g_fabric.node_hash, &node_id, sizeof(cf_node), |
867 | (void **)&node) == CF_RCHASH_OK) { |
868 | if (cf_rchash_delete(g_fabric.node_hash, &node_id, sizeof(node_id)) != |
869 | CF_RCHASH_OK) { |
870 | cf_crash(AS_FABRIC, "fabric_node_pop(%lx)" , node_id); |
871 | } |
872 | } |
873 | |
874 | cf_mutex_unlock(&g_fabric.node_hash_lock); |
875 | |
876 | return node; |
877 | } |
878 | |
879 | static int |
880 | fabric_node_disconnect_reduce_fn(const void *key, void *data, void *udata) |
881 | { |
882 | fabric_connection *fc = *(fabric_connection **)key; |
883 | |
884 | cf_assert(fc, AS_FABRIC, "fc == NULL, don't put NULLs into fc_hash" ); |
885 | cf_socket_shutdown(&fc->sock); |
886 | fabric_connection_release(fc); // for delete from node->fc_hash |
887 | |
888 | return CF_SHASH_REDUCE_DELETE; |
889 | } |
890 | |
891 | static void |
892 | fabric_node_disconnect(cf_node node_id) |
893 | { |
894 | fabric_node *node = fabric_node_pop(node_id); |
895 | |
896 | if (! node) { |
897 | cf_warning(AS_FABRIC, "fabric_node_disconnect(%lx) not connected" , node_id); |
898 | return; |
899 | } |
900 | |
901 | cf_info(AS_FABRIC, "fabric_node_disconnect(%lx)" , node_id); |
902 | |
903 | cf_mutex_lock(&node->fc_hash_lock); |
904 | |
905 | node->live = false; |
906 | // Clean up all fc's attached to this node. |
907 | cf_shash_reduce(node->fc_hash, fabric_node_disconnect_reduce_fn, NULL); |
908 | |
909 | cf_mutex_unlock(&node->fc_hash_lock); |
910 | |
911 | cf_mutex_lock(&node->send_idle_fc_queue_lock); |
912 | |
913 | for (int i = 0; i < AS_FABRIC_N_CHANNELS; i++) { |
914 | while (true) { |
915 | fabric_connection *fc; |
916 | |
917 | int rv = cf_queue_pop(&node->send_idle_fc_queue[i], &fc, |
918 | CF_QUEUE_NOWAIT); |
919 | |
920 | if (rv != CF_QUEUE_OK) { |
921 | break; |
922 | } |
923 | |
924 | fabric_connection_send_unassign(fc); |
925 | fabric_connection_release(fc); |
926 | } |
927 | } |
928 | |
929 | cf_mutex_unlock(&node->send_idle_fc_queue_lock); |
930 | |
931 | fabric_node_release(node); // from fabric_node_pop() |
932 | } |
933 | |
934 | static fabric_connection * |
935 | fabric_node_connect(fabric_node *node, uint32_t ch) |
936 | { |
937 | cf_detail(AS_FABRIC, "fabric_node_connect(%p, %u)" , node, ch); |
938 | |
939 | cf_mutex_lock(&node->connect_lock); |
940 | |
941 | uint32_t fds = node->connect_count[ch] + 1; |
942 | |
943 | if (fds > g_fabric_connect_limit[ch]) { |
944 | cf_mutex_unlock(&node->connect_lock); |
945 | return NULL; |
946 | } |
947 | |
948 | cf_socket sock; |
949 | cf_sock_addr addr; |
950 | size_t endpoint_list_capacity = 1024; |
951 | int tries_remaining = 3; |
952 | |
953 | while (tries_remaining--) { |
954 | uint8_t endpoint_list_mem[endpoint_list_capacity]; |
955 | as_endpoint_list *endpoint_list = (as_endpoint_list *)endpoint_list_mem; |
956 | |
957 | if (fabric_endpoint_list_get(node->node_id, endpoint_list, |
958 | &endpoint_list_capacity)) { |
959 | char endpoint_list_str[1024]; |
960 | |
961 | as_endpoint_list_to_string(endpoint_list, endpoint_list_str, |
962 | sizeof(endpoint_list_str)); |
963 | cf_detail(AS_FABRIC, "fabric_node_connect(%p, %u) node_id %lx with endpoints {%s}" , node, ch, node->node_id, endpoint_list_str); |
964 | |
965 | // Initiate connect to the remote endpoint. |
966 | const as_endpoint *connected_endpoint = as_endpoint_connect_any( |
967 | endpoint_list, fabric_connect_endpoint_filter, NULL, 0, |
968 | &sock); |
969 | |
970 | if (! connected_endpoint) { |
971 | cf_detail(AS_FABRIC, "fabric_node_connect(%p, %u) node_id %lx failed for endpoints {%s}" , node, ch, node->node_id, endpoint_list_str); |
972 | cf_mutex_unlock(&node->connect_lock); |
973 | return NULL; |
974 | } |
975 | |
976 | as_endpoint_to_sock_addr(connected_endpoint, &addr); |
977 | |
978 | if (as_endpoint_capability_is_supported(connected_endpoint, |
979 | AS_ENDPOINT_TLS_MASK)) { |
980 | tls_socket_prepare_client(g_fabric_tls, &sock); |
981 | } |
982 | |
983 | break; // read success |
984 | } |
985 | |
986 | if (errno == ENOENT) { |
987 | // No entry present for this node in heartbeat. |
988 | cf_detail(AS_FABRIC, "fabric_node_connect(%p, %u) unknown remote node %lx" , node, ch, node->node_id); |
989 | cf_mutex_unlock(&node->connect_lock); |
990 | return NULL; |
991 | } |
992 | |
993 | // The list capacity was not enough. Retry with suggested list size. |
994 | } |
995 | |
996 | if (tries_remaining < 0) { |
997 | cf_warning(AS_FABRIC,"fabric_node_connect(%p, %u) List get error for remote node %lx" , node, ch, node->node_id); |
998 | cf_mutex_unlock(&node->connect_lock); |
999 | return NULL; |
1000 | } |
1001 | |
1002 | msg *m = as_fabric_msg_get(M_TYPE_FABRIC); |
1003 | |
1004 | cf_atomic64_incr(&g_stats.fabric_connections_opened); |
1005 | as_health_add_node_counter(node->node_id, AS_HEALTH_NODE_FABRIC_FDS); |
1006 | |
1007 | msg_set_uint64(m, FS_FIELD_NODE, g_config.self_node); |
1008 | msg_set_uint32(m, FS_CHANNEL, ch); |
1009 | m->benchmark_time = g_config.fabric_benchmarks_enabled ? cf_getns() : 0; |
1010 | |
1011 | fabric_connection *fc = fabric_connection_create(&sock, &addr); |
1012 | |
1013 | fc->s_msg_in_progress = m; |
1014 | fc->started_via_connect = true; |
1015 | fc->pool = &g_fabric.recv_pool[ch]; |
1016 | |
1017 | if (! fabric_node_add_connection(node, fc)) { |
1018 | fabric_connection_release(fc); |
1019 | cf_mutex_unlock(&node->connect_lock); |
1020 | return NULL; |
1021 | } |
1022 | |
1023 | node->connect_count[ch]++; |
1024 | node->connect_full = fabric_node_is_connect_full(node); |
1025 | |
1026 | cf_mutex_unlock(&node->connect_lock); |
1027 | |
1028 | return fc; |
1029 | } |
1030 | |
1031 | static int |
1032 | fabric_node_send(fabric_node *node, msg *m, as_fabric_channel channel) |
1033 | { |
1034 | if (! node || ! node->live) { |
1035 | return AS_FABRIC_ERR_NO_NODE; |
1036 | } |
1037 | |
1038 | while (true) { |
1039 | // Sync with fabric_connection_process_writable() to avoid non-empty |
1040 | // send_queue with every fc being in send_idle_fc_queue. |
1041 | cf_mutex_lock(&node->send_idle_fc_queue_lock); |
1042 | |
1043 | fabric_connection *fc; |
1044 | int rv = cf_queue_pop(&node->send_idle_fc_queue[(int)channel], &fc, |
1045 | CF_QUEUE_NOWAIT); |
1046 | |
1047 | if (rv != CF_QUEUE_OK) { |
1048 | cf_queue_push(&node->send_queue[(int)channel], &m); |
1049 | cf_mutex_unlock(&node->send_idle_fc_queue_lock); |
1050 | |
1051 | if (! node->connect_full) { |
1052 | fabric_node_connect_all(node); |
1053 | } |
1054 | |
1055 | break; |
1056 | } |
1057 | |
1058 | cf_mutex_unlock(&node->send_idle_fc_queue_lock); |
1059 | |
1060 | if ((! cf_socket_exists(&fc->sock)) || fc->failed) { |
1061 | fabric_connection_send_unassign(fc); |
1062 | fabric_connection_release(fc); // send_idle_fc_queue |
1063 | continue; |
1064 | } |
1065 | |
1066 | fc->s_msg_in_progress = m; |
1067 | |
1068 | // Wake up. |
1069 | if (fc->send_ptr) { |
1070 | fabric_connection_send_rearm(fc); // takes fc ref |
1071 | } |
1072 | else { |
1073 | fabric_connection_send_assign(fc); // takes fc ref |
1074 | } |
1075 | |
1076 | break; |
1077 | } |
1078 | |
1079 | return AS_FABRIC_SUCCESS; |
1080 | } |
1081 | |
1082 | static void |
1083 | fabric_node_connect_all(fabric_node *node) |
1084 | { |
1085 | if (! node->live) { |
1086 | return; |
1087 | } |
1088 | |
1089 | for (uint32_t ch = 0; ch < AS_FABRIC_N_CHANNELS; ch++) { |
1090 | uint32_t n = g_fabric_connect_limit[ch] - node->connect_count[ch]; |
1091 | |
1092 | for (uint32_t i = 0; i < n; i++) { |
1093 | fabric_connection *fc = fabric_node_connect(node, ch); |
1094 | |
1095 | if (! fc) { |
1096 | break; |
1097 | } |
1098 | |
1099 | // TLS connections are one-way. Outgoing connections are for |
1100 | // outgoing data. |
1101 | if (fc->sock.state == CF_SOCKET_STATE_NON_TLS) { |
1102 | fabric_recv_thread_pool_add_fc(&g_fabric.recv_pool[ch], fc); |
1103 | cf_detail(AS_FABRIC, "{%16lX, %u} activated" , fabric_connection_get_id(fc), fc->sock.fd); |
1104 | |
1105 | if (channel_nagle[ch]) { |
1106 | fc->s_cork_bypass = true; |
1107 | cf_socket_enable_nagle(&fc->sock); |
1108 | } |
1109 | } |
1110 | else { |
1111 | fc->s_cork_bypass = true; |
1112 | } |
1113 | |
1114 | // Takes the remaining ref for send_poll and idle queue. |
1115 | fabric_connection_send_assign(fc); |
1116 | } |
1117 | } |
1118 | } |
1119 | |
1120 | static void |
1121 | fabric_node_destructor(void *pnode) |
1122 | { |
1123 | fabric_node *node = (fabric_node *)pnode; |
1124 | cf_detail(AS_FABRIC, "fabric_node_destructor(%p)" , node); |
1125 | |
1126 | for (int i = 0; i < AS_FABRIC_N_CHANNELS; i++) { |
1127 | // send_idle_fc_queue section. |
1128 | cf_assert(cf_queue_sz(&node->send_idle_fc_queue[i]) == 0, AS_FABRIC, "send_idle_fc_queue not empty as expected" ); |
1129 | cf_queue_destroy(&node->send_idle_fc_queue[i]); |
1130 | |
1131 | // send_queue section. |
1132 | while (true) { |
1133 | msg *m; |
1134 | |
1135 | if (cf_queue_pop(&node->send_queue[i], &m, CF_QUEUE_NOWAIT) != |
1136 | CF_QUEUE_OK) { |
1137 | break; |
1138 | } |
1139 | |
1140 | as_fabric_msg_put(m); |
1141 | } |
1142 | |
1143 | cf_queue_destroy(&node->send_queue[i]); |
1144 | } |
1145 | |
1146 | cf_mutex_destroy(&node->send_idle_fc_queue_lock); |
1147 | |
1148 | // connection_hash section. |
1149 | cf_assert(cf_shash_get_size(node->fc_hash) == 0, AS_FABRIC, "fc_hash not empty as expected" ); |
1150 | cf_shash_destroy(node->fc_hash); |
1151 | |
1152 | cf_mutex_destroy(&node->fc_hash_lock); |
1153 | } |
1154 | |
1155 | inline static void |
1156 | fabric_node_reserve(fabric_node *node) { |
1157 | cf_rc_reserve(node); |
1158 | } |
1159 | |
1160 | inline static void |
1161 | fabric_node_release(fabric_node *node) |
1162 | { |
1163 | if (cf_rc_release(node) == 0) { |
1164 | fabric_node_destructor(node); |
1165 | cf_rc_free(node); |
1166 | } |
1167 | } |
1168 | |
1169 | static bool |
1170 | fabric_node_add_connection(fabric_node *node, fabric_connection *fc) |
1171 | { |
1172 | cf_mutex_lock(&node->fc_hash_lock); |
1173 | |
1174 | if (! node->live) { |
1175 | cf_mutex_unlock(&node->fc_hash_lock); |
1176 | return false; |
1177 | } |
1178 | |
1179 | fabric_node_reserve(node); |
1180 | fc->node = node; |
1181 | |
1182 | fabric_connection_set_keepalive_options(fc); |
1183 | fabric_connection_reserve(fc); // for put into node->fc_hash |
1184 | |
1185 | uint8_t value = 0; |
1186 | int rv = cf_shash_put_unique(node->fc_hash, &fc, &value); |
1187 | |
1188 | cf_assert(rv == CF_SHASH_OK, AS_FABRIC, "fabric_node_add_connection(%p, %p) failed to add with rv %d" , node, fc, rv); |
1189 | |
1190 | cf_mutex_unlock(&node->fc_hash_lock); |
1191 | |
1192 | return true; |
1193 | } |
1194 | |
1195 | static uint8_t |
1196 | fabric_node_find_min_send_count(const fabric_node *node) |
1197 | { |
1198 | uint8_t min = node->send_counts[0]; |
1199 | |
1200 | for (uint32_t i = 1; i < g_config.n_fabric_send_threads; i++) { |
1201 | if (node->send_counts[i] < min) { |
1202 | min = node->send_counts[i]; |
1203 | } |
1204 | } |
1205 | |
1206 | return min; |
1207 | } |
1208 | |
1209 | static bool |
1210 | fabric_node_is_connect_full(const fabric_node *node) |
1211 | { |
1212 | for (int ch = 0; ch < AS_FABRIC_N_CHANNELS; ch++) { |
1213 | if (node->connect_count[ch] < g_fabric_connect_limit[ch]) { |
1214 | return false; |
1215 | } |
1216 | } |
1217 | |
1218 | return true; |
1219 | } |
1220 | |
1221 | |
1222 | static int |
1223 | fabric_get_node_list_fn(const void *key, uint32_t keylen, void *data, |
1224 | void *udata) |
1225 | { |
1226 | node_list *nl = (node_list *)udata; |
1227 | |
1228 | if (nl->count == AS_CLUSTER_SZ) { |
1229 | return 0; |
1230 | } |
1231 | |
1232 | nl->nodes[nl->count] = *(const cf_node *)key; |
1233 | nl->count++; |
1234 | |
1235 | return 0; |
1236 | } |
1237 | |
1238 | // Get a list of all the nodes - use a dynamic array, which requires inline. |
1239 | static uint32_t |
1240 | fabric_get_node_list(node_list *nl) |
1241 | { |
1242 | nl->count = 1; |
1243 | nl->nodes[0] = g_config.self_node; |
1244 | |
1245 | cf_mutex_lock(&g_fabric.node_hash_lock); |
1246 | cf_rchash_reduce(g_fabric.node_hash, fabric_get_node_list_fn, nl); |
1247 | cf_mutex_unlock(&g_fabric.node_hash_lock); |
1248 | |
1249 | return nl->count; |
1250 | } |
1251 | |
1252 | |
1253 | //========================================================== |
1254 | // fabric_connection |
1255 | // |
1256 | |
1257 | fabric_connection * |
1258 | fabric_connection_create(cf_socket *sock, cf_sock_addr *peer) |
1259 | { |
1260 | fabric_connection *fc = cf_rc_alloc(sizeof(fabric_connection)); |
1261 | |
1262 | memset(fc, 0, sizeof(fabric_connection)); |
1263 | |
1264 | cf_socket_copy(sock, &fc->sock); |
1265 | cf_sock_addr_copy(peer, &fc->peer); |
1266 | |
1267 | fc->r_type = M_TYPE_FABRIC; |
1268 | |
1269 | return fc; |
1270 | } |
1271 | |
1272 | static bool |
1273 | fabric_connection_accept_tls(fabric_connection *fc) |
1274 | { |
1275 | int32_t tls_ev = tls_socket_accept(&fc->sock); |
1276 | |
1277 | if (tls_ev == EPOLLERR) { |
1278 | cf_warning(AS_FABRIC, "fabric TLS server handshake with %s failed" , cf_sock_addr_print(&fc->peer)); |
1279 | return false; |
1280 | } |
1281 | |
1282 | if (tls_ev == 0) { |
1283 | tls_socket_must_not_have_data(&fc->sock, "fabric server handshake" ); |
1284 | tls_ev = EPOLLIN; |
1285 | } |
1286 | |
1287 | cf_poll_modify_socket(g_accept_poll, &fc->sock, |
1288 | tls_ev | EPOLLERR | EPOLLHUP | EPOLLRDHUP, fc); |
1289 | return true; |
1290 | } |
1291 | |
1292 | static bool |
1293 | fabric_connection_connect_tls(fabric_connection *fc) |
1294 | { |
1295 | int32_t tls_ev = tls_socket_connect(&fc->sock); |
1296 | |
1297 | if (tls_ev == EPOLLERR) { |
1298 | cf_warning(AS_FABRIC, "fabric TLS client handshake with %s failed" , cf_sock_addr_print(&fc->peer)); |
1299 | return false; |
1300 | } |
1301 | |
1302 | if (tls_ev == 0) { |
1303 | tls_socket_must_not_have_data(&fc->sock, "fabric client handshake" ); |
1304 | tls_ev = EPOLLOUT; |
1305 | } |
1306 | |
1307 | cf_poll_modify_socket(fc->send_ptr->poll, &fc->sock, |
1308 | tls_ev | DEFAULT_EVENTS, fc); |
1309 | return true; |
1310 | } |
1311 | |
1312 | inline static void |
1313 | fabric_connection_reserve(fabric_connection *fc) |
1314 | { |
1315 | cf_rc_reserve(fc); |
1316 | } |
1317 | |
1318 | static void |
1319 | fabric_connection_release(fabric_connection *fc) |
1320 | { |
1321 | if (cf_rc_release(fc) == 0) { |
1322 | if (fc->s_msg_in_progress) { |
1323 | // First message (s_count == 0) is initial M_TYPE_FABRIC message |
1324 | // and does not need to be saved. |
1325 | if (! fc->started_via_connect || fc->s_count != 0) { |
1326 | cf_queue_push(&fc->node->send_queue[fc->pool->pool_id], |
1327 | &fc->s_msg_in_progress); |
1328 | } |
1329 | else { |
1330 | as_fabric_msg_put(fc->s_msg_in_progress); |
1331 | } |
1332 | } |
1333 | |
1334 | if (fc->node) { |
1335 | fabric_node_release(fc->node); |
1336 | fc->node = NULL; |
1337 | } |
1338 | else { |
1339 | cf_detail(AS_FABRIC, "releasing fc %p not attached to a node" , fc); |
1340 | } |
1341 | |
1342 | cf_socket_close(&fc->sock); |
1343 | cf_socket_term(&fc->sock); |
1344 | cf_atomic64_incr(&g_stats.fabric_connections_closed); |
1345 | |
1346 | cf_free(fc->r_bigbuf); |
1347 | cf_rc_free(fc); |
1348 | } |
1349 | } |
1350 | |
1351 | inline static cf_node |
1352 | fabric_connection_get_id(const fabric_connection *fc) |
1353 | { |
1354 | if (fc->node) { |
1355 | return fc->node->node_id; |
1356 | } |
1357 | |
1358 | return 0; |
1359 | } |
1360 | |
1361 | inline static void |
1362 | fabric_connection_cork(fabric_connection *fc) |
1363 | { |
1364 | if (fc->s_cork == 1 || fc->s_cork_bypass) { |
1365 | return; |
1366 | } |
1367 | |
1368 | fc->s_cork = 1; |
1369 | cf_socket_set_cork(&fc->sock, fc->s_cork); |
1370 | } |
1371 | |
1372 | inline static void |
1373 | fabric_connection_uncork(fabric_connection *fc) |
1374 | { |
1375 | if (fc->s_cork == 0 || fc->s_cork_bypass) { |
1376 | return; |
1377 | } |
1378 | |
1379 | fc->s_cork = 0; |
1380 | cf_socket_set_cork(&fc->sock, fc->s_cork); |
1381 | } |
1382 | |
1383 | // epoll takes the reference of fc. |
1384 | static void |
1385 | fabric_connection_send_assign(fabric_connection *fc) |
1386 | { |
1387 | cf_mutex_lock(&g_fabric.send_lock); |
1388 | |
1389 | send_entry **pp = &g_fabric.send_head; |
1390 | uint8_t min = fabric_node_find_min_send_count(fc->node); |
1391 | |
1392 | while (true) { |
1393 | uint32_t send_id = (*pp)->id; |
1394 | |
1395 | if (fc->node->send_counts[send_id] == min) { |
1396 | break; |
1397 | } |
1398 | |
1399 | cf_assert((*pp)->next, AS_FABRIC, "fabric_connection_send_assign() invalid send_count state" ); |
1400 | |
1401 | pp = &(*pp)->next; |
1402 | } |
1403 | |
1404 | send_entry *se = *pp; |
1405 | |
1406 | se->count++; |
1407 | fc->node->send_counts[se->id]++; |
1408 | |
1409 | if (se->next && se->next->count < se->count) { |
1410 | *pp = se->next; |
1411 | send_entry_insert(pp, se); |
1412 | } |
1413 | |
1414 | fc->send_ptr = se; |
1415 | |
1416 | cf_mutex_unlock(&g_fabric.send_lock); |
1417 | |
1418 | cf_poll_add_socket(se->poll, &fc->sock, EPOLLOUT | DEFAULT_EVENTS, fc); |
1419 | } |
1420 | |
1421 | static void |
1422 | fabric_connection_send_unassign(fabric_connection *fc) |
1423 | { |
1424 | cf_mutex_lock(&g_fabric.send_lock); |
1425 | |
1426 | if (! fc->send_ptr) { |
1427 | cf_mutex_unlock(&g_fabric.send_lock); |
1428 | return; |
1429 | } |
1430 | |
1431 | send_entry **pp = &g_fabric.send_head; |
1432 | send_entry *se = fc->send_ptr; |
1433 | |
1434 | while (*pp != se) { |
1435 | cf_assert((*pp)->next, AS_FABRIC, "fabric_connection_send_unassign() invalid send_count state" ); |
1436 | |
1437 | pp = &(*pp)->next; |
1438 | } |
1439 | |
1440 | cf_assert(se->count != 0 || fc->node->send_counts[se->id] != 0, AS_FABRIC, "invalid send_count accounting se %p id %u count %u node send_count %u" , |
1441 | se, se->id, se->count, fc->node->send_counts[se->id]); |
1442 | |
1443 | se->count--; |
1444 | fc->node->send_counts[se->id]--; |
1445 | |
1446 | *pp = se->next; |
1447 | send_entry_insert(&g_fabric.send_head, se); |
1448 | |
1449 | fc->send_ptr = NULL; |
1450 | |
1451 | cf_mutex_unlock(&g_fabric.send_lock); |
1452 | } |
1453 | |
1454 | inline static void |
1455 | fabric_connection_recv_rearm(fabric_connection *fc) |
1456 | { |
1457 | fc->r_rearm_count++; |
1458 | cf_poll_modify_socket(fc->pool->poll, &fc->sock, |
1459 | EPOLLIN | DEFAULT_EVENTS, fc); |
1460 | } |
1461 | |
1462 | // epoll takes the reference of fc. |
1463 | inline static void |
1464 | fabric_connection_send_rearm(fabric_connection *fc) |
1465 | { |
1466 | cf_poll_modify_socket(fc->send_ptr->poll, &fc->sock, |
1467 | EPOLLOUT | DEFAULT_EVENTS, fc); |
1468 | } |
1469 | |
1470 | static void |
1471 | fabric_connection_disconnect(fabric_connection *fc) |
1472 | { |
1473 | fc->failed = true; |
1474 | cf_socket_shutdown(&fc->sock); |
1475 | |
1476 | fabric_node *node = fc->node; |
1477 | |
1478 | if (! node) { |
1479 | return; |
1480 | } |
1481 | |
1482 | cf_mutex_lock(&node->fc_hash_lock); |
1483 | |
1484 | if (cf_shash_delete(node->fc_hash, &fc) != CF_SHASH_OK) { |
1485 | cf_detail(AS_FABRIC, "fc %p is not in (node %p)->fc_hash" , fc, node); |
1486 | cf_mutex_unlock(&node->fc_hash_lock); |
1487 | return; |
1488 | } |
1489 | |
1490 | cf_mutex_unlock(&node->fc_hash_lock); |
1491 | |
1492 | if (fc->started_via_connect) { |
1493 | cf_mutex_lock(&node->connect_lock); |
1494 | |
1495 | cf_atomic32_decr(&node->connect_count[fc->pool->pool_id]); |
1496 | node->connect_full = false; |
1497 | |
1498 | cf_mutex_unlock(&node->connect_lock); |
1499 | } |
1500 | |
1501 | cf_mutex_lock(&node->send_idle_fc_queue_lock); |
1502 | |
1503 | if (cf_queue_delete(&node->send_idle_fc_queue[fc->pool->pool_id], &fc, |
1504 | true) == CF_QUEUE_OK) { |
1505 | fabric_connection_send_unassign(fc); |
1506 | fabric_connection_release(fc); // for delete from send_idle_fc_queue |
1507 | } |
1508 | |
1509 | cf_mutex_unlock(&node->send_idle_fc_queue_lock); |
1510 | |
1511 | cf_detail(AS_FABRIC, "fabric_connection_disconnect(%p) {pool=%u id=%lx fd=%u}" , |
1512 | fc, fc->pool ? fc->pool->pool_id : 0, |
1513 | node ? node->node_id : (cf_node)0, fc->sock.fd); |
1514 | |
1515 | fabric_connection_release(fc); // for delete from node->fc_hash |
1516 | } |
1517 | |
1518 | static void |
1519 | fabric_connection_set_keepalive_options(fabric_connection *fc) |
1520 | { |
1521 | if (g_config.fabric_keepalive_enabled) { |
1522 | cf_socket_keep_alive(&fc->sock, g_config.fabric_keepalive_time, |
1523 | g_config.fabric_keepalive_intvl, |
1524 | g_config.fabric_keepalive_probes); |
1525 | } |
1526 | } |
1527 | |
1528 | static void |
1529 | fabric_connection_reroute_msg(fabric_connection *fc) |
1530 | { |
1531 | if (! fc->s_msg_in_progress) { |
1532 | return; |
1533 | } |
1534 | |
1535 | // Don't reroute initial M_TYPE_FABRIC message. |
1536 | if ((fc->started_via_connect && fc->s_count == 0) || |
1537 | fabric_node_send(fc->node, fc->s_msg_in_progress, |
1538 | fc->pool->pool_id) != AS_FABRIC_SUCCESS) { |
1539 | as_fabric_msg_put(fc->s_msg_in_progress); |
1540 | } |
1541 | |
1542 | fc->s_msg_in_progress = NULL; |
1543 | } |
1544 | |
1545 | static void |
1546 | fabric_connection_incr_iov(fabric_connection *fc, uint32_t sz) |
1547 | { |
1548 | while (sz != 0) { |
1549 | if (sz <= fc->s_iov->iov_len) { |
1550 | fc->s_iov->iov_base = (uint8_t *)fc->s_iov->iov_base + sz; |
1551 | fc->s_iov->iov_len -= sz; |
1552 | return; |
1553 | } |
1554 | |
1555 | cf_assert(fc->s_iov_count != 0, AS_PARTICLE, "fc->s_iov_count == 0" ); |
1556 | sz -= fc->s_iov->iov_len; |
1557 | fc->s_iov->iov_len = 0; |
1558 | fc->s_iov++; |
1559 | fc->s_iov_count--; |
1560 | } |
1561 | } |
1562 | |
1563 | static bool |
1564 | fabric_connection_send_progress(fabric_connection *fc) |
1565 | { |
1566 | if (fc->s_msg_sz == 0) { // new msg |
1567 | msg *m = fc->s_msg_in_progress; |
1568 | |
1569 | fc->s_iov = (struct iovec *)fc->s_buf; |
1570 | fc->s_iov_count = msg_to_iov_buf(m, fc->s_buf, sizeof(fc->s_buf), |
1571 | &fc->s_msg_sz); |
1572 | fc->s_sz = 0; |
1573 | |
1574 | if (m->benchmark_time != 0) { |
1575 | m->benchmark_time = histogram_insert_data_point( |
1576 | g_stats.fabric_send_init_hists[fc->pool->pool_id], |
1577 | m->benchmark_time); |
1578 | } |
1579 | } |
1580 | |
1581 | struct msghdr sendhdr = { |
1582 | .msg_iov = fc->s_iov, |
1583 | .msg_iovlen = fc->s_iov_count |
1584 | }; |
1585 | |
1586 | int32_t send_sz = cf_socket_send_msg(&fc->sock, &sendhdr, 0); |
1587 | |
1588 | if (send_sz < 0) { |
1589 | if (errno != EAGAIN && errno != EWOULDBLOCK) { |
1590 | return false; |
1591 | } |
1592 | |
1593 | send_sz = 0; // treat as sending 0 |
1594 | } |
1595 | |
1596 | if (fc->s_msg_in_progress->benchmark_time != 0) { |
1597 | fc->s_msg_in_progress->benchmark_time = histogram_insert_data_point( |
1598 | g_stats.fabric_send_fragment_hists[fc->pool->pool_id], |
1599 | fc->s_msg_in_progress->benchmark_time); |
1600 | } |
1601 | |
1602 | fc->s_sz += send_sz; |
1603 | fc->s_bytes += send_sz; |
1604 | |
1605 | if (fc->s_sz == fc->s_msg_sz) { // complete send |
1606 | as_fabric_msg_put(fc->s_msg_in_progress); |
1607 | fc->s_msg_in_progress = NULL; |
1608 | fc->s_msg_sz = 0; |
1609 | fc->s_count++; |
1610 | } |
1611 | else { // partial send |
1612 | fabric_connection_incr_iov(fc, (uint32_t)send_sz); |
1613 | } |
1614 | |
1615 | return true; |
1616 | } |
1617 | |
1618 | // Must rearm or place into idle queue on success. |
1619 | static bool |
1620 | fabric_connection_process_writable(fabric_connection *fc) |
1621 | { |
1622 | fabric_node *node = fc->node; |
1623 | uint32_t pool = fc->pool->pool_id; |
1624 | |
1625 | if (! fc->s_msg_in_progress) { |
1626 | // TODO - Change to load op when atomic API is ready. |
1627 | // Also should be rare or not even happen in x86_64. |
1628 | cf_warning(AS_FABRIC, "fc(%p)->s_msg_in_progress NULL on entry" , fc); |
1629 | return false; |
1630 | } |
1631 | |
1632 | fabric_connection_cork(fc); |
1633 | |
1634 | while (true) { |
1635 | if (! fabric_connection_send_progress(fc)) { |
1636 | return false; |
1637 | } |
1638 | |
1639 | if (fc->s_msg_in_progress) { |
1640 | fabric_connection_send_rearm(fc); |
1641 | return true; |
1642 | } |
1643 | |
1644 | if (cf_queue_pop(&node->send_queue[pool], &fc->s_msg_in_progress, |
1645 | CF_QUEUE_NOWAIT) != CF_QUEUE_OK) { |
1646 | break; |
1647 | } |
1648 | } |
1649 | |
1650 | fabric_connection_uncork(fc); |
1651 | |
1652 | if (! fc->node->live || fc->failed) { |
1653 | return false; |
1654 | } |
1655 | |
1656 | // Try with bigger lock block to sync with as_fabric_send(). |
1657 | cf_mutex_lock(&node->send_idle_fc_queue_lock); |
1658 | |
1659 | if (! fc->node->live || fc->failed) { |
1660 | cf_mutex_unlock(&node->send_idle_fc_queue_lock); |
1661 | return false; |
1662 | } |
1663 | |
1664 | if (cf_queue_pop(&node->send_queue[pool], &fc->s_msg_in_progress, |
1665 | CF_QUEUE_NOWAIT) == CF_QUEUE_EMPTY) { |
1666 | cf_queue_push(&node->send_idle_fc_queue[pool], &fc); |
1667 | cf_mutex_unlock(&node->send_idle_fc_queue_lock); |
1668 | return true; |
1669 | } |
1670 | |
1671 | cf_mutex_unlock(&node->send_idle_fc_queue_lock); |
1672 | |
1673 | fabric_connection_send_rearm(fc); |
1674 | |
1675 | return true; |
1676 | } |
1677 | |
1678 | // Return true on success. |
1679 | static bool |
1680 | fabric_connection_process_fabric_msg(fabric_connection *fc, const msg *m) |
1681 | { |
1682 | cf_poll_delete_socket(g_accept_poll, &fc->sock); |
1683 | |
1684 | cf_node node_id; |
1685 | |
1686 | if (msg_get_uint64(m, FS_FIELD_NODE, &node_id) != 0) { |
1687 | cf_warning(AS_FABRIC, "process_fabric_msg: failed to read M_TYPE_FABRIC node" ); |
1688 | return false; |
1689 | } |
1690 | |
1691 | cf_detail(AS_FABRIC, "process_fabric_msg: M_TYPE_FABRIC from node %lx" , node_id); |
1692 | |
1693 | fabric_node *node = fabric_node_get_or_create(node_id); |
1694 | |
1695 | if (! fabric_node_add_connection(node, fc)) { |
1696 | fabric_node_release(node); // from cf_rchash_get |
1697 | return false; |
1698 | } |
1699 | |
1700 | uint32_t pool_id = AS_FABRIC_N_CHANNELS; // illegal value |
1701 | |
1702 | msg_get_uint32(m, FS_CHANNEL, &pool_id); |
1703 | |
1704 | if (pool_id >= AS_FABRIC_N_CHANNELS) { |
1705 | fabric_node_release(node); // from cf_rchash_get |
1706 | return false; |
1707 | } |
1708 | |
1709 | fc->r_sz = 0; |
1710 | fc->r_buf_sz = 0; |
1711 | |
1712 | // fc->pool needs to be set before placing into send_idle_fc_queue. |
1713 | fabric_recv_thread_pool_add_fc(&g_fabric.recv_pool[pool_id], fc); |
1714 | |
1715 | // TLS connections are one-way. Incoming connections are for |
1716 | // incoming data. |
1717 | if (fc->sock.state == CF_SOCKET_STATE_NON_TLS) { |
1718 | if (channel_nagle[pool_id]) { |
1719 | fc->s_cork_bypass = true; |
1720 | cf_socket_enable_nagle(&fc->sock); |
1721 | } |
1722 | |
1723 | cf_mutex_lock(&node->send_idle_fc_queue_lock); |
1724 | |
1725 | if (node->live && ! fc->failed) { |
1726 | fabric_connection_reserve(fc); // for send poll & idleQ |
1727 | |
1728 | if (cf_queue_pop(&node->send_queue[pool_id], &fc->s_msg_in_progress, |
1729 | CF_QUEUE_NOWAIT) == CF_QUEUE_EMPTY) { |
1730 | cf_queue_push(&node->send_idle_fc_queue[pool_id], &fc); |
1731 | } |
1732 | else { |
1733 | fabric_connection_send_assign(fc); |
1734 | } |
1735 | } |
1736 | |
1737 | cf_mutex_unlock(&node->send_idle_fc_queue_lock); |
1738 | } |
1739 | else { |
1740 | fc->s_cork_bypass = true; |
1741 | } |
1742 | |
1743 | fabric_node_release(node); // from cf_rchash_get |
1744 | fabric_connection_release(fc); // from g_accept_poll |
1745 | |
1746 | return true; |
1747 | } |
1748 | |
1749 | static bool |
1750 | fabric_connection_read_fabric_msg(fabric_connection *fc) |
1751 | { |
1752 | while (true) { |
1753 | int32_t recv_sz = cf_socket_recv(&fc->sock, fc->r_buf + fc->r_sz, |
1754 | sizeof(msg_hdr) + fc->r_buf_sz - fc->r_sz, 0); |
1755 | |
1756 | if (recv_sz < 0) { |
1757 | if (errno != EAGAIN && errno != EWOULDBLOCK) { |
1758 | cf_warning(AS_FABRIC, "fabric_connection_read_fabric_msg() recv_sz %d errno %d %s" , recv_sz, errno, cf_strerror(errno)); |
1759 | return false; |
1760 | } |
1761 | |
1762 | break; |
1763 | } |
1764 | |
1765 | if (recv_sz == 0) { |
1766 | cf_detail(AS_FABRIC, "fabric_connection_read_fabric_msg(%p) recv_sz 0 r_msg_sz %u" , fc, fc->r_buf_sz); |
1767 | return false; |
1768 | } |
1769 | |
1770 | fc->r_sz += recv_sz; |
1771 | fc->r_bytes += recv_sz; |
1772 | |
1773 | if (fc->r_buf_sz == 0) { |
1774 | if (fc->r_sz < sizeof(msg_hdr)) { |
1775 | tls_socket_must_not_have_data(&fc->sock, "partial fabric read" ); |
1776 | break; |
1777 | } |
1778 | |
1779 | msg_parse_hdr(&fc->r_buf_sz, &fc->r_type, fc->r_buf, fc->r_sz); |
1780 | |
1781 | if (fc->r_buf_sz >= sizeof(fc->r_buf)) { |
1782 | cf_warning(AS_FABRIC, "r_msg_sz > sizeof(fc->r_membuf) %zu" , sizeof(fc->r_buf)); |
1783 | return false; |
1784 | } |
1785 | |
1786 | if (fc->r_buf_sz != 0) { |
1787 | continue; |
1788 | } |
1789 | } |
1790 | |
1791 | if (fc->r_sz < sizeof(msg_hdr) + fc->r_buf_sz) { |
1792 | tls_socket_must_not_have_data(&fc->sock, "partial fabric read" ); |
1793 | break; |
1794 | } |
1795 | |
1796 | tls_socket_must_not_have_data(&fc->sock, "full fabric read" ); |
1797 | |
1798 | if (fc->r_type != M_TYPE_FABRIC) { |
1799 | cf_warning(AS_FABRIC, "fabric_connection_read_fabric_msg() expected type M_TYPE_FABRIC(%d) got type %d" , M_TYPE_FABRIC, fc->r_type); |
1800 | return false; |
1801 | } |
1802 | |
1803 | msg *m = as_fabric_msg_get(M_TYPE_FABRIC); |
1804 | |
1805 | if (! msg_parse_fields(m, fc->r_buf + sizeof(msg_hdr), fc->r_buf_sz)) { |
1806 | cf_warning(AS_FABRIC, "msg_parse failed for fc %p" , fc); |
1807 | as_fabric_msg_put(m); |
1808 | return false; |
1809 | } |
1810 | |
1811 | bool ret = fabric_connection_process_fabric_msg(fc, m); |
1812 | |
1813 | as_fabric_msg_put(m); |
1814 | |
1815 | return ret; |
1816 | } |
1817 | |
1818 | return true; |
1819 | } |
1820 | |
1821 | // Return true on success. |
1822 | // Must have re-armed on success if do_rearm == true. |
1823 | static bool |
1824 | fabric_connection_process_msg(fabric_connection *fc, bool do_rearm) |
1825 | { |
1826 | cf_assert(fc->node, AS_FABRIC, "process_msg: no node assigned" ); |
1827 | |
1828 | uint32_t read_ahead_sz = fc->r_sz - fc->r_buf_sz; |
1829 | uint32_t mem_sz = fc->r_buf_sz; |
1830 | uint8_t *p_bigbuf = fc->r_bigbuf; // malloc handoff |
1831 | |
1832 | fc->r_bigbuf = NULL; |
1833 | |
1834 | if (p_bigbuf) { |
1835 | fc->r_sz = read_ahead_sz; |
1836 | mem_sz = 0; |
1837 | } |
1838 | |
1839 | while (read_ahead_sz > sizeof(msg_hdr)) { |
1840 | read_ahead_sz -= sizeof(msg_hdr); |
1841 | |
1842 | uint32_t *ptr = (uint32_t *)(fc->r_buf + mem_sz); |
1843 | uint32_t sz = cf_swap_from_be32(*ptr); |
1844 | |
1845 | if (read_ahead_sz < sz) { |
1846 | break; |
1847 | } |
1848 | |
1849 | mem_sz += sizeof(msg_hdr) + sz; |
1850 | read_ahead_sz -= sz; |
1851 | } |
1852 | |
1853 | uint8_t stack_mem[mem_sz + 1]; // +1 to account for mem_sz == 0 |
1854 | |
1855 | memcpy(stack_mem, fc->r_buf, mem_sz); |
1856 | fc->r_sz -= mem_sz; |
1857 | memmove(fc->r_buf, fc->r_buf + mem_sz, fc->r_sz); |
1858 | |
1859 | uint8_t *buf_ptr = p_bigbuf; |
1860 | |
1861 | if (! buf_ptr) { |
1862 | buf_ptr = stack_mem; |
1863 | mem_sz -= fc->r_buf_sz; |
1864 | } |
1865 | |
1866 | buf_ptr += sizeof(msg_hdr); |
1867 | |
1868 | // Save some state for after re-arm. |
1869 | cf_node node = fc->node->node_id; |
1870 | uint64_t bt = fc->benchmark_time; |
1871 | uint32_t ch = fc->pool->pool_id; |
1872 | msg_type type = fc->r_type; |
1873 | uint32_t msg_sz = fc->r_buf_sz - sizeof(msg_hdr); |
1874 | |
1875 | fc->r_buf_sz = 0; |
1876 | |
1877 | if (do_rearm) { |
1878 | // Re-arm for next message (possibly handled in another thread). |
1879 | fabric_connection_recv_rearm(fc); // do not use fc after this point |
1880 | } |
1881 | |
1882 | while (true) { |
1883 | if (! msg_type_is_valid(type)) { |
1884 | cf_warning(AS_FABRIC, "failed to create message for type %u (max %u)" , type, M_TYPE_MAX); |
1885 | cf_free(p_bigbuf); |
1886 | return false; |
1887 | } |
1888 | |
1889 | msg *m = as_fabric_msg_get(type); |
1890 | |
1891 | if (! msg_parse_fields(m, buf_ptr, msg_sz)) { |
1892 | cf_warning(AS_FABRIC, "msg_parse_fields failed for fc %p" , fc); |
1893 | as_fabric_msg_put(m); |
1894 | cf_free(p_bigbuf); |
1895 | return false; |
1896 | } |
1897 | |
1898 | if (g_fabric.msg_cb[m->type]) { |
1899 | (g_fabric.msg_cb[m->type])(node, m, g_fabric.msg_udata[m->type]); |
1900 | |
1901 | if (bt != 0) { |
1902 | histogram_insert_data_point(g_stats.fabric_recv_cb_hists[ch], |
1903 | bt); |
1904 | } |
1905 | } |
1906 | else { |
1907 | cf_warning(AS_FABRIC, "process_msg: could not deliver message type %d" , m->type); |
1908 | as_fabric_msg_put(m); |
1909 | } |
1910 | |
1911 | if (p_bigbuf) { |
1912 | cf_free(p_bigbuf); |
1913 | p_bigbuf = NULL; |
1914 | buf_ptr = stack_mem; |
1915 | } |
1916 | else { |
1917 | buf_ptr += msg_sz; |
1918 | } |
1919 | |
1920 | if (mem_sz < sizeof(msg_hdr)) { |
1921 | cf_assert(mem_sz == 0, AS_FABRIC, "process_msg: stack_sz left %u != 0" , mem_sz); |
1922 | break; |
1923 | } |
1924 | |
1925 | msg_parse_hdr(&msg_sz, &type, buf_ptr, mem_sz); |
1926 | buf_ptr += sizeof(msg_hdr); |
1927 | mem_sz -= sizeof(msg_hdr) + msg_sz; |
1928 | } |
1929 | |
1930 | return true; |
1931 | } |
1932 | |
1933 | // Return true on success. |
1934 | // Must have re-armed on success. |
1935 | static bool |
1936 | fabric_connection_process_readable(fabric_connection *fc) |
1937 | { |
1938 | size_t recv_all = 0; |
1939 | |
1940 | while (true) { |
1941 | int32_t recv_sz; |
1942 | |
1943 | if (! fc->r_bigbuf) { |
1944 | recv_sz = cf_socket_recv(&fc->sock, fc->r_buf + fc->r_sz, |
1945 | sizeof(fc->r_buf) - fc->r_sz, 0); |
1946 | } |
1947 | else { |
1948 | struct iovec iov[2] = { |
1949 | { |
1950 | .iov_base = fc->r_bigbuf + fc->r_sz, |
1951 | .iov_len = fc->r_buf_sz - fc->r_sz |
1952 | }, |
1953 | { |
1954 | .iov_base = fc->r_buf, |
1955 | .iov_len = sizeof(fc->r_buf) // read ahead |
1956 | } |
1957 | }; |
1958 | |
1959 | struct msghdr recvhdr = { |
1960 | .msg_iov = iov, |
1961 | .msg_iovlen = 2 |
1962 | }; |
1963 | |
1964 | recv_sz = cf_socket_recv_msg(&fc->sock, &recvhdr, 0); |
1965 | } |
1966 | |
1967 | if (recv_sz < 0) { |
1968 | if (errno != EAGAIN && errno != EWOULDBLOCK) { |
1969 | cf_warning(AS_FABRIC, "fabric_connection_process_readable() recv_sz %d msg_sz %u errno %d %s" , recv_sz, fc->r_buf_sz, errno, cf_strerror(errno)); |
1970 | return false; |
1971 | } |
1972 | |
1973 | break; |
1974 | } |
1975 | else if (recv_sz == 0) { |
1976 | cf_detail(AS_FABRIC, "fabric_connection_process_readable(%p) recv_sz 0 msg_sz %u" , fc, fc->r_buf_sz); |
1977 | return false; |
1978 | } |
1979 | |
1980 | fc->r_sz += recv_sz; |
1981 | fc->r_bytes += recv_sz; |
1982 | recv_all += recv_sz; |
1983 | |
1984 | if (fc->r_buf_sz == 0) { |
1985 | fc->benchmark_time = g_config.fabric_benchmarks_enabled ? |
1986 | cf_getns() : 0; |
1987 | |
1988 | if (fc->r_sz < sizeof(msg_hdr)) { |
1989 | break; |
1990 | } |
1991 | |
1992 | msg_parse_hdr(&fc->r_buf_sz, &fc->r_type, fc->r_buf, fc->r_sz); |
1993 | fc->r_buf_sz += sizeof(msg_hdr); |
1994 | |
1995 | if (fc->r_buf_sz > sizeof(fc->r_buf)) { |
1996 | if (fc->r_buf_sz > FABRIC_BUFFER_MAX_SZ) { |
1997 | cf_warning(AS_FABRIC, "fabric_connection_process_readable(%p) invalid msg_size %u remote 0x%lx" , fc, fc->r_buf_sz, fabric_connection_get_id(fc)); |
1998 | return false; |
1999 | } |
2000 | |
2001 | fc->r_bigbuf = cf_malloc(fc->r_buf_sz); |
2002 | memcpy(fc->r_bigbuf, fc->r_buf, fc->r_sz); // fc->r_sz < r_msg_sz here |
2003 | } |
2004 | } |
2005 | |
2006 | if (fc->r_sz < fc->r_buf_sz) { |
2007 | if (fc->benchmark_time != 0) { |
2008 | fc->benchmark_time = histogram_insert_data_point( |
2009 | g_stats.fabric_recv_fragment_hists[fc->pool->pool_id], |
2010 | fc->benchmark_time); |
2011 | } |
2012 | |
2013 | break; |
2014 | } |
2015 | |
2016 | bool do_rearm = |
2017 | recv_all > (size_t)g_config.fabric_recv_rearm_threshold || |
2018 | fc->r_buf_sz > g_config.fabric_recv_rearm_threshold; |
2019 | |
2020 | if (! fabric_connection_process_msg(fc, do_rearm)) { |
2021 | return false; |
2022 | } |
2023 | |
2024 | if (do_rearm) { |
2025 | // Already rearmed. |
2026 | return true; |
2027 | } |
2028 | } |
2029 | |
2030 | fabric_connection_recv_rearm(fc); |
2031 | return true; |
2032 | } |
2033 | |
2034 | |
2035 | //========================================================== |
2036 | // fabric_recv_thread_pool |
2037 | // |
2038 | |
2039 | static void |
2040 | fabric_recv_thread_pool_init(fabric_recv_thread_pool *pool, uint32_t size, |
2041 | uint32_t pool_id) |
2042 | { |
2043 | cf_vector_init(&pool->threads, sizeof(cf_tid), size, 0); |
2044 | cf_poll_create(&pool->poll); |
2045 | pool->pool_id = pool_id; |
2046 | } |
2047 | |
2048 | // Called only at startup or under set-config lock. Caller has checked size. |
2049 | static void |
2050 | fabric_recv_thread_pool_set_size(fabric_recv_thread_pool *pool, uint32_t size) |
2051 | { |
2052 | while (size < cf_vector_size(&pool->threads)) { |
2053 | cf_tid tid; |
2054 | |
2055 | cf_vector_pop(&pool->threads, &tid); |
2056 | cf_thread_cancel(tid); |
2057 | } |
2058 | |
2059 | while (size > cf_vector_size(&pool->threads)) { |
2060 | cf_tid tid = cf_thread_create_detached(run_fabric_recv, (void*)pool); |
2061 | |
2062 | cf_vector_append(&pool->threads, &tid); |
2063 | } |
2064 | } |
2065 | |
2066 | static void |
2067 | fabric_recv_thread_pool_add_fc(fabric_recv_thread_pool *pool, |
2068 | fabric_connection *fc) |
2069 | { |
2070 | fabric_connection_reserve(fc); // extra ref for poll |
2071 | fc->pool = pool; |
2072 | |
2073 | uint32_t recv_events = EPOLLIN | DEFAULT_EVENTS; |
2074 | |
2075 | cf_poll_add_socket(pool->poll, &fc->sock, recv_events, fc); |
2076 | } |
2077 | |
2078 | |
2079 | //========================================================== |
2080 | // fabric_endpoint |
2081 | // |
2082 | |
2083 | // Get the endpoint list to connect to the remote node. |
2084 | // Returns true on success where errno will be set to ENOENT if there is no |
2085 | // endpoint list could be obtained for this node and ENOMEM if the input |
2086 | // endpoint_list_size is less than actual size. Var endpoint_list_size will be |
2087 | // updated with the required capacity. |
2088 | static bool |
2089 | fabric_endpoint_list_get(cf_node nodeid, as_endpoint_list *endpoint_list, |
2090 | size_t *endpoint_list_size) |
2091 | { |
2092 | as_hb_plugin_node_data plugin_data = { |
2093 | .data_capacity = *endpoint_list_size, |
2094 | .data = endpoint_list, |
2095 | .data_size = 0, |
2096 | }; |
2097 | |
2098 | if (as_hb_plugin_data_get(nodeid, AS_HB_PLUGIN_FABRIC, &plugin_data, NULL, |
2099 | NULL) == 0) { |
2100 | return plugin_data.data_size != 0; |
2101 | } |
2102 | |
2103 | if (errno == ENOENT) { |
2104 | return false; |
2105 | } |
2106 | |
2107 | // Not enough allocated memory. |
2108 | *endpoint_list_size = plugin_data.data_size; |
2109 | |
2110 | return false; |
2111 | } |
2112 | |
2113 | // Filter out endpoints not matching this node's capabilities. |
2114 | static bool |
2115 | fabric_connect_endpoint_filter(const as_endpoint *endpoint, void *udata) |
2116 | { |
2117 | if (cf_ip_addr_legacy_only() && |
2118 | endpoint->addr_type == AS_ENDPOINT_ADDR_TYPE_IPv6) { |
2119 | return false; |
2120 | } |
2121 | |
2122 | // If we don't offer TLS, then we won't connect via TLS, either. |
2123 | if (g_config.tls_fabric.bind_port == 0 && |
2124 | as_endpoint_capability_is_supported(endpoint, |
2125 | AS_ENDPOINT_TLS_MASK)) { |
2126 | return false; |
2127 | } |
2128 | |
2129 | return true; |
2130 | } |
2131 | |
2132 | |
2133 | //========================================================== |
2134 | // Thread functions. |
2135 | // |
2136 | |
2137 | static void * |
2138 | run_fabric_recv(void *arg) |
2139 | { |
2140 | cf_thread_disable_cancel(); |
2141 | |
2142 | fabric_recv_thread_pool *pool = (fabric_recv_thread_pool *)arg; |
2143 | static int worker_id_counter = 0; |
2144 | uint64_t worker_id = worker_id_counter++; |
2145 | cf_poll poll = pool->poll; |
2146 | |
2147 | cf_detail(AS_FABRIC, "run_fabric_recv() created index %lu" , worker_id); |
2148 | |
2149 | while (true) { |
2150 | cf_thread_test_cancel(); |
2151 | |
2152 | cf_poll_event events[FABRIC_EPOLL_RECV_EVENTS]; |
2153 | int32_t n = cf_poll_wait(poll, events, FABRIC_EPOLL_RECV_EVENTS, -1); |
2154 | |
2155 | for (int32_t i = 0; i < n; i++) { |
2156 | fabric_connection *fc = events[i].data; |
2157 | |
2158 | if (fc->node && ! fc->node->live) { |
2159 | fabric_connection_disconnect(fc); |
2160 | fabric_connection_release(fc); |
2161 | continue; |
2162 | } |
2163 | |
2164 | // Handle remote close, socket errors. |
2165 | // Also triggered by call to cf_socket_shutdown(fc->sock), but only |
2166 | // first call. |
2167 | // Not triggered by cf_socket_close(fc->sock), which automatically |
2168 | // does EPOLL_CTL_DEL. |
2169 | if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { |
2170 | cf_detail(AS_FABRIC, "%lu: epoll : error, will close: fc %p fd %d errno %d signal {err:%d, hup:%d, rdhup:%d}" , |
2171 | worker_id, |
2172 | fc, CSFD(&fc->sock), errno, |
2173 | ((events[i].events & EPOLLERR) ? 1 : 0), |
2174 | ((events[i].events & EPOLLHUP) ? 1 : 0), |
2175 | ((events[i].events & EPOLLRDHUP) ? 1 : 0)); |
2176 | fabric_connection_disconnect(fc); |
2177 | fabric_connection_release(fc); |
2178 | continue; |
2179 | } |
2180 | |
2181 | cf_assert(events[i].events == EPOLLIN, AS_FABRIC, "epoll not setup correctly for %p" , fc); |
2182 | uint32_t rearm_count = fc->r_rearm_count; |
2183 | |
2184 | if (! fabric_connection_process_readable(fc)) { |
2185 | fabric_connection_disconnect(fc); |
2186 | |
2187 | if (rearm_count == fc->r_rearm_count) { |
2188 | fabric_connection_release(fc); |
2189 | } |
2190 | |
2191 | continue; |
2192 | } |
2193 | } |
2194 | } |
2195 | |
2196 | return NULL; |
2197 | } |
2198 | |
2199 | static void * |
2200 | run_fabric_send(void *arg) |
2201 | { |
2202 | send_entry *se = (send_entry *)arg; |
2203 | cf_poll poll = se->poll; |
2204 | |
2205 | cf_detail(AS_FABRIC, "run_fabric_send() fd %d id %u" , poll.fd, se->id); |
2206 | |
2207 | while (true) { |
2208 | cf_poll_event events[FABRIC_EPOLL_SEND_EVENTS]; |
2209 | int32_t n = cf_poll_wait(poll, events, FABRIC_EPOLL_SEND_EVENTS, -1); |
2210 | |
2211 | for (int32_t i = 0; i < n; i++) { |
2212 | fabric_connection *fc = events[i].data; |
2213 | |
2214 | if (fc->node && ! fc->node->live) { |
2215 | fabric_connection_disconnect(fc); |
2216 | fabric_connection_send_unassign(fc); |
2217 | fabric_connection_release(fc); |
2218 | continue; |
2219 | } |
2220 | |
2221 | // Handle remote close, socket errors. Also triggered by call to |
2222 | // cf_socket_shutdown(fb->sock), but only first call. Not triggered |
2223 | // by cf_socket_close(fb->sock), which automatically EPOLL_CTL_DEL. |
2224 | if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { |
2225 | cf_detail(AS_FABRIC, "epoll : error, will close: fc %p fd %d errno %d signal {err:%d, hup:%d, rdhup:%d}" , |
2226 | fc, CSFD(&fc->sock), errno, |
2227 | ((events[i].events & EPOLLERR) ? 1 : 0), |
2228 | ((events[i].events & EPOLLHUP) ? 1 : 0), |
2229 | ((events[i].events & EPOLLRDHUP) ? 1 : 0)); |
2230 | fabric_connection_disconnect(fc); |
2231 | fabric_connection_send_unassign(fc); |
2232 | fabric_connection_reroute_msg(fc); |
2233 | fabric_connection_release(fc); |
2234 | continue; |
2235 | } |
2236 | |
2237 | if (tls_socket_needs_handshake(&fc->sock)) { |
2238 | if (! fabric_connection_connect_tls(fc)) { |
2239 | fabric_connection_disconnect(fc); |
2240 | fabric_connection_send_unassign(fc); |
2241 | fabric_connection_reroute_msg(fc); |
2242 | fabric_connection_release(fc); |
2243 | } |
2244 | |
2245 | continue; |
2246 | } |
2247 | |
2248 | cf_assert(events[i].events == EPOLLOUT, AS_FABRIC, "epoll not setup correctly for %p" , fc); |
2249 | |
2250 | if (! fabric_connection_process_writable(fc)) { |
2251 | fabric_connection_disconnect(fc); |
2252 | fabric_connection_send_unassign(fc); |
2253 | fabric_connection_reroute_msg(fc); |
2254 | fabric_connection_release(fc); |
2255 | continue; |
2256 | } |
2257 | } |
2258 | } |
2259 | |
2260 | return 0; |
2261 | } |
2262 | |
2263 | static void * |
2264 | run_fabric_accept(void *arg) |
2265 | { |
2266 | cf_sockets sockset; |
2267 | |
2268 | if (cf_socket_init_server(&g_fabric_bind, &sockset) < 0) { |
2269 | cf_crash(AS_FABRIC, "Could not create fabric listener socket - check configuration" ); |
2270 | } |
2271 | |
2272 | cf_poll_create(&g_accept_poll); |
2273 | cf_poll_add_sockets(g_accept_poll, &sockset, EPOLLIN | EPOLLERR | EPOLLHUP); |
2274 | cf_socket_show_server(AS_FABRIC, "fabric" , &sockset); |
2275 | |
2276 | while (true) { |
2277 | // Accept new connections on the service socket. |
2278 | cf_poll_event events[64]; |
2279 | int32_t n = cf_poll_wait(g_accept_poll, events, 64, -1); |
2280 | |
2281 | for (int32_t i = 0; i < n; i++) { |
2282 | cf_socket *ssock = events[i].data; |
2283 | |
2284 | if (cf_sockets_has_socket(&sockset, ssock)) { |
2285 | cf_socket csock; |
2286 | cf_sock_addr sa; |
2287 | |
2288 | if (cf_socket_accept(ssock, &csock, &sa) < 0) { |
2289 | if (errno == EMFILE) { |
2290 | cf_warning(AS_FABRIC, "low on file descriptors" ); |
2291 | continue; |
2292 | } |
2293 | else { |
2294 | cf_crash(AS_FABRIC, "cf_socket_accept: %d %s" , errno, cf_strerror(errno)); |
2295 | } |
2296 | } |
2297 | |
2298 | cf_detail(AS_FABRIC, "fabric_accept: accepting new sock %d" , CSFD(&csock)); |
2299 | cf_atomic64_incr(&g_stats.fabric_connections_opened); |
2300 | |
2301 | fabric_connection *fc = fabric_connection_create(&csock, &sa); |
2302 | |
2303 | cf_sock_cfg *cfg = ssock->cfg; |
2304 | |
2305 | if (cfg->owner == CF_SOCK_OWNER_FABRIC_TLS) { |
2306 | tls_socket_prepare_server(g_fabric_tls, &fc->sock); |
2307 | } |
2308 | |
2309 | uint32_t events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; |
2310 | cf_poll_add_socket(g_accept_poll, &fc->sock, events, fc); |
2311 | } |
2312 | else { |
2313 | fabric_connection *fc = events[i].data; |
2314 | |
2315 | if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { |
2316 | fabric_connection_release(fc); |
2317 | continue; |
2318 | } |
2319 | |
2320 | if (tls_socket_needs_handshake(&fc->sock)) { |
2321 | if (! fabric_connection_accept_tls(fc)) { |
2322 | fabric_connection_release(fc); |
2323 | } |
2324 | |
2325 | continue; |
2326 | } |
2327 | |
2328 | if (! fabric_connection_read_fabric_msg(fc)) { |
2329 | fabric_connection_release(fc); |
2330 | continue; |
2331 | } |
2332 | } |
2333 | } |
2334 | } |
2335 | |
2336 | return 0; |
2337 | } |
2338 | |
2339 | static int |
2340 | fabric_rate_node_reduce_fn(const void *key, uint32_t keylen, void *data, |
2341 | void *udata) |
2342 | { |
2343 | fabric_node *node = (fabric_node *)data; |
2344 | fabric_rate *rate = (fabric_rate *)udata; |
2345 | |
2346 | cf_mutex_lock(&node->fc_hash_lock); |
2347 | cf_shash_reduce(node->fc_hash, fabric_rate_fc_reduce_fn, rate); |
2348 | cf_mutex_unlock(&node->fc_hash_lock); |
2349 | |
2350 | return 0; |
2351 | } |
2352 | |
2353 | static int |
2354 | fabric_rate_fc_reduce_fn(const void *key, void *data, void *udata) |
2355 | { |
2356 | fabric_connection *fc = *(fabric_connection **)key; |
2357 | fabric_rate *rate = (fabric_rate *)udata; |
2358 | |
2359 | if (! fc->pool) { |
2360 | return 0; |
2361 | } |
2362 | |
2363 | uint32_t pool_id = fc->pool->pool_id; |
2364 | uint64_t r_bytes = fc->r_bytes; |
2365 | uint64_t s_bytes = fc->s_bytes; |
2366 | |
2367 | rate->r_bytes[pool_id] += r_bytes - fc->r_bytes_last; |
2368 | rate->s_bytes[pool_id] += s_bytes - fc->s_bytes_last; |
2369 | |
2370 | fc->r_bytes_last = r_bytes; |
2371 | fc->s_bytes_last = s_bytes; |
2372 | |
2373 | return 0; |
2374 | } |
2375 | |
2376 | |
2377 | //========================================================== |
2378 | // Heartbeat. |
2379 | // |
2380 | |
2381 | // Set the fabric advertised endpoints. |
2382 | static void |
2383 | fabric_hb_plugin_set_fn(msg *m) |
2384 | { |
2385 | if (m->type == M_TYPE_HEARTBEAT_V2) { |
2386 | // In v1 and v2 fabric does not advertise its endpoints and they |
2387 | // do not support plugged in data. |
2388 | return; |
2389 | } |
2390 | |
2391 | if (! fabric_published_endpoints_refresh()) { |
2392 | cf_warning(AS_FABRIC, "No publish addresses found for fabric." ); |
2393 | return; |
2394 | } |
2395 | |
2396 | size_t payload_size = 0; |
2397 | |
2398 | if (as_endpoint_list_sizeof( |
2399 | g_published_endpoint_list, &payload_size) != 0) { |
2400 | cf_crash(AS_FABRIC, "Error getting endpoint list size for published addresses." ); |
2401 | } |
2402 | |
2403 | msg_set_buf(m, AS_HB_MSG_FABRIC_DATA, (uint8_t *)g_published_endpoint_list, |
2404 | payload_size, MSG_SET_COPY); |
2405 | } |
2406 | |
2407 | // Plugin function that parses succession list out of a heartbeat pulse message. |
2408 | static void |
2409 | fabric_hb_plugin_parse_data_fn(msg *m, cf_node source, |
2410 | as_hb_plugin_node_data *prev_plugin_data, |
2411 | as_hb_plugin_node_data *plugin_data) |
2412 | { |
2413 | if (m->type == M_TYPE_HEARTBEAT_V2) { |
2414 | plugin_data->data_size = 0; |
2415 | return; |
2416 | } |
2417 | |
2418 | uint8_t *payload = NULL; |
2419 | size_t payload_size = 0; |
2420 | |
2421 | if (msg_get_buf(m, AS_HB_MSG_FABRIC_DATA, &payload, &payload_size, |
2422 | MSG_GET_DIRECT) != 0) { |
2423 | cf_warning(AS_FABRIC, "Unable to read fabric published endpoint list from heartbeat from node %lx" , source); |
2424 | return; |
2425 | } |
2426 | |
2427 | if (payload_size > plugin_data->data_capacity) { |
2428 | // Round up to nearest multiple of block size to prevent very frequent |
2429 | // reallocation. |
2430 | size_t data_capacity = ((payload_size + HB_PLUGIN_DATA_BLOCK_SIZE - 1) / |
2431 | HB_PLUGIN_DATA_BLOCK_SIZE) * HB_PLUGIN_DATA_BLOCK_SIZE; |
2432 | |
2433 | // Reallocate since we have outgrown existing capacity. |
2434 | plugin_data->data = cf_realloc(plugin_data->data, data_capacity); |
2435 | |
2436 | plugin_data->data_capacity = data_capacity; |
2437 | } |
2438 | |
2439 | plugin_data->data_size = payload_size; |
2440 | |
2441 | memcpy(plugin_data->data, payload, payload_size); |
2442 | } |
2443 | |
2444 | // Function is called when a new node created or destroyed on the heartbeat |
2445 | // system. |
2446 | // This will insert a new element in the hashtable that keeps track of all TCP |
2447 | // connections. |
2448 | static void |
2449 | fabric_heartbeat_event(int nevents, as_hb_event_node *events, void *udata) |
2450 | { |
2451 | if ((nevents < 1) || (nevents > AS_CLUSTER_SZ) || ! events) { |
2452 | cf_warning(AS_FABRIC, "fabric: received event count of %d" , nevents); |
2453 | return; |
2454 | } |
2455 | |
2456 | for (int i = 0; i < nevents; i++) { |
2457 | switch (events[i].evt) { |
2458 | case AS_HB_NODE_ARRIVE: { |
2459 | fabric_node *node = fabric_node_get_or_create(events[i].nodeid); |
2460 | fabric_node_release(node); // for node_get_or_create() |
2461 | |
2462 | cf_info(AS_FABRIC, "fabric: node %lx arrived" , events[i].nodeid); |
2463 | } |
2464 | break; |
2465 | case AS_HB_NODE_DEPART: |
2466 | cf_info(AS_FABRIC, "fabric: node %lx departed" , events[i].nodeid); |
2467 | fabric_node_disconnect(events[i].nodeid); |
2468 | break; |
2469 | case AS_HB_NODE_ADJACENCY_CHANGED: |
2470 | // Not relevant to fabric. |
2471 | break; |
2472 | default: |
2473 | cf_warning(AS_FABRIC, "fabric: received unknown event type %d %lx" , events[i].evt, events[i].nodeid); |
2474 | break; |
2475 | } |
2476 | } |
2477 | } |
2478 | |