1/*
2 * service_list.c
3 *
4 * Copyright (C) 2017-2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "fabric/service_list.h"
28
29#include <errno.h>
30#include <pthread.h>
31#include <stdbool.h>
32#include <stddef.h>
33#include <stdint.h>
34#include <stdlib.h>
35#include <string.h>
36#include <time.h>
37#include <unistd.h>
38
39#include <sys/time.h>
40
41#include "cf_mutex.h"
42#include "cf_str.h"
43#include "cf_thread.h"
44#include "dynbuf.h"
45#include "fault.h"
46#include "msg.h"
47#include "node.h"
48#include "shash.h"
49#include "socket.h"
50
51#include "base/security.h"
52#include "base/service.h"
53#include "base/thr_info.h"
54
55#include "citrusleaf/alloc.h"
56#include "citrusleaf/cf_clock.h"
57#include "citrusleaf/cf_hash_math.h"
58#include "citrusleaf/cf_queue.h"
59
60#include "fabric/clustering.h"
61#include "fabric/exchange.h"
62#include "fabric/fabric.h"
63#include "fabric/hb.h"
64
65#include "warnings.h"
66
67
68//==========================================================
69// Typedefs & constants.
70//
71
72#define HASH_STR_SZ 50
73
74#define DEFAULT_PORT "3000"
75
76typedef enum {
77 // These values go on the wire, so mind backward compatibility if changing.
78 FIELD_OP,
79 FIELD_GEN,
80 FIELD_SERV,
81 FIELD_SERV_ALT,
82 FIELD_CLEAR_STD,
83 FIELD_TLS_STD,
84 FIELD_CLEAR_ALT,
85 FIELD_TLS_ALT,
86 FIELD_TLS_NAME,
87
88 NUM_FIELDS
89} services_msg_field;
90
91#define OP_UPDATE 0
92#define OP_ACK 1
93#define OP_UPDATE_REQ 2
94
95// got_update | got_ack | Action on update thread
96// ------------+-----------+------------------------------------------------
97// false | false | Send OP_UPDATE_REQ
98// | | We don't know the peer and it doesn't know us.
99// | |
100// false | true | Send OP_UPDATE_REQ
101// | | We don't know the peer, but it knows us.
102// | |
103// true | false | Send OP_UPDATE
104// | | We know the peer, but it doesn't know us.
105// | |
106// true | true | None
107// | | We know each other.
108
109typedef struct peer_s {
110 bool present; // peer is a current peer (vs. an alumnus)
111
112 bool got_update; // we saw an update from this peer
113 bool got_ack; // we saw an acknowledgment from this peer
114 uint64_t retrans_at_ms; // time of next retransmission for this peer
115
116 uint64_t in_gen; // generation of last incoming change
117
118 char *serv; // goes into "services"
119 char *serv_alt; // goes into "services-alternate"
120
121 char *clear_std; // goes into "peers-clear-std"
122 char *tls_std; // goes into "peers-tls-std"
123 char *clear_alt; // goes into "peers-clear-alt"
124 char *tls_alt; // goes into "peers-tls-alt"
125 char *tls_name; // peer's TLS name
126} peer_t;
127
128// Maps the given peer_t to a field in the peer_t.
129typedef char **(*proj_t)(peer_t *p);
130
131typedef struct filter_s {
132 bool tls; // when set, include the TLS name
133 bool present; // when set, exclude alumni
134 uint64_t since; // base generation for a delta build
135} filter_t;
136
137// Builds an info value in g_info. Reduces g_peers, applies the given filter,
138// and selects the peer_t field given by the projection function.
139typedef void (*build_t)(cf_dyn_buf *db, proj_t proj, const filter_t *filter);
140
141// How to turn the peer_t entries (g_peers) into an info value (g_info).
142typedef struct peer_val_s {
143 const char *key; // info value's key name, e.g., "services"
144 proj_t proj; // projection function for the underlying
145 // field in peer_t
146 build_t build; // build function to update the info value
147 const filter_t
148 *filter; // filter passed to the build function
149} peer_val_t;
150
151// How to turn our own services (g_local) into an info value (g_info).
152typedef struct local_val_s {
153 const char *key; // key identifier
154 proj_t proj; // projection function that selects the
155 // corresponding field from g_local
156} local_val_t;
157
158// Links fabric message fields to their peer_t fields.
159typedef struct field_proj_s {
160 int32_t field; // message field identifier (FIELD_*)
161 proj_t proj; // projection function that selects the
162 // corresponding field from a given peer_t
163} field_proj_t;
164
165// Context for building an info value. Passed to the build function.
166typedef struct print_par_s {
167 cf_dyn_buf *db; // the buffer to print to
168 proj_t proj; // the field to print
169 const char *strip; // what to strip from the end of the field
170 bool tls; // when set, includes the TLS name
171 bool present; // when set, excludes alumni
172 uint64_t since; // the base generation for a delta build
173 uint32_t count; // the number of already printed fields
174} print_par_t;
175
176// Context for the update thread's g_peers reduce.
177typedef struct update_ctx_s {
178 uint64_t now_ms; // current time
179 uint64_t retrans_at_ms; // minimum of retransmission times, across all
180 // peer_t entries in g_peers
181} update_ctx_t;
182
183static const msg_template MSG_TEMP[] = {
184 { FIELD_OP, M_FT_UINT32 },
185 { FIELD_GEN, M_FT_UINT32 },
186 { FIELD_SERV, M_FT_STR },
187 { FIELD_SERV_ALT, M_FT_STR },
188 { FIELD_CLEAR_STD, M_FT_STR },
189 { FIELD_TLS_STD, M_FT_STR },
190 { FIELD_CLEAR_ALT, M_FT_STR },
191 { FIELD_TLS_ALT, M_FT_STR },
192 { FIELD_TLS_NAME, M_FT_STR }
193};
194
195COMPILER_ASSERT(sizeof(MSG_TEMP) / sizeof(msg_template) == NUM_FIELDS);
196
197#define SCRATCH_SZ 512
198#define RETRANS_INTERVAL_MS 1000
199
200static const filter_t PEERS_CLEAR = {
201 .tls = false, .present = true, .since = 0
202};
203
204static const filter_t PEERS_TLS = {
205 .tls = true, .present = true, .since = 0
206};
207
208static const filter_t ALUMNI_CLEAR = {
209 .tls = false, .present = false, .since = 0
210};
211
212static const filter_t ALUMNI_TLS = {
213 .tls = true, .present = false, .since = 0
214};
215
216
217//==========================================================
218// Forward declarations.
219//
220
221// Peer management.
222
223static char **proj_serv(peer_t *p); // maps p to &p->serv
224static char **proj_serv_alt(peer_t *p); // maps p to &p->serv_alt
225static char **proj_clear_std(peer_t *p); // etc.
226static char **proj_tls_std(peer_t *p);
227static char **proj_clear_alt(peer_t *p);
228static char **proj_tls_alt(peer_t *p);
229static char **proj_tls_name(peer_t *p);
230
231static peer_t *create_peer(cf_node node);
232static peer_t *find_peer(cf_node node);
233static void dump_peer(cf_node node, const peer_t *p);
234static void set_present(const cf_node *nodes, uint32_t n_nodes, bool present);
235
236static int32_t purge_alumni_reduce(const void *key, void *data, void *udata);
237static void purge_alumni(void);
238
239static void handle_cluster_change(const as_exchange_cluster_changed_event *ev,
240 void *udata);
241static int32_t handle_fabric_message(cf_node node, msg *m, void *udata);
242
243static int32_t update_reduce(const void *key, void *data, void *udata);
244static void *run_update_thread(void *udata);
245static void wake_up(void);
246
247// Local node information.
248
249static char *print_list(const as_service_endpoint *endp, uint32_t limit,
250 char sep, bool legacy);
251static void enum_addrs(cf_addr_list *addrs);
252static void free_addrs(cf_addr_list *addrs);
253static void populate_local(void);
254
255// Info value management.
256
257static void set_info_val(const char *key, const char *val);
258static void print_info_val(const char *key, cf_dyn_buf *db);
259
260static int32_t build_peers_reduce(const void *key, void *data, void *udata);
261static void build_peers(cf_dyn_buf *db, proj_t proj, const filter_t *filter);
262
263static int32_t build_services_reduce(const void *key, void *data, void *udata);
264static void build_services(cf_dyn_buf *db, proj_t proj, const filter_t *filter);
265
266static void build_gen(cf_dyn_buf *db, proj_t proj, const filter_t *filter);
267
268static void recalc(void);
269
270// Miscellaneous.
271
272static const char *op_str(uint32_t op);
273static char *strip_suff(const char *in, const char *suff, char *out);
274
275
276//==========================================================
277// Inlines & macros.
278//
279
280#define ARRAY_COUNT(_a) (sizeof(_a) / (sizeof((_a)[0])))
281
282
283//==========================================================
284// Function tables.
285//
286
287static const peer_val_t PEER_VALS[] = {
288// key proj build filter
289 { "peers-generation", NULL, build_gen, NULL },
290 { "peers-clear-std", proj_clear_std, build_peers, &PEERS_CLEAR },
291 { "peers-clear-alt", proj_clear_alt, build_peers, &PEERS_CLEAR },
292 { "peers-tls-std", proj_tls_std, build_peers, &PEERS_TLS },
293 { "peers-tls-alt", proj_tls_alt, build_peers, &PEERS_TLS },
294 { "alumni-clear-std", proj_clear_std, build_peers, &ALUMNI_CLEAR },
295 { "alumni-tls-std", proj_tls_std, build_peers, &ALUMNI_TLS },
296 { "services", proj_serv, build_services, &PEERS_CLEAR },
297 { "services-alternate", proj_serv_alt, build_services, &PEERS_CLEAR },
298 { "services-alumni", proj_serv, build_services, &ALUMNI_CLEAR }
299};
300
301static const local_val_t LOCAL_VALS[] = {
302 { "service-clear-std", proj_clear_std },
303 { "service-clear-alt", proj_clear_alt },
304 { "service-tls-std", proj_tls_std },
305 { "service-tls-alt", proj_tls_alt },
306 { "service", proj_serv }
307};
308
309static const field_proj_t FIELD_PROJS[] = {
310 { FIELD_CLEAR_STD, proj_clear_std },
311 { FIELD_CLEAR_ALT, proj_clear_alt },
312 { FIELD_TLS_STD, proj_tls_std },
313 { FIELD_TLS_ALT, proj_tls_alt },
314 { FIELD_TLS_NAME, proj_tls_name },
315 { FIELD_SERV, proj_serv },
316 { FIELD_SERV_ALT, proj_serv_alt }
317};
318
319
320//==========================================================
321// Globals.
322//
323
324// Counts incoming peer changes.
325static uint64_t g_in_gen = 0;
326
327// Locking order: g_peers_lock before g_info_lock.
328
329static cf_mutex g_peers_lock = CF_MUTEX_INIT;
330static pthread_rwlock_t g_info_lock = PTHREAD_RWLOCK_INITIALIZER;
331
332// These hash tables are created without locks.
333
334static cf_shash *g_peers;
335static cf_shash *g_info;
336
337// "Peer" that holds our information.
338static peer_t g_local;
339
340// Signals the update thread.
341static cf_queue g_wake_up;
342
343
344//==========================================================
345// Public API.
346//
347
348void
349as_service_list_init(void)
350{
351 cf_detail(AS_SERVICE_LIST, "initializing service list");
352
353 // These hash tables are created without locks.
354
355 g_info = cf_shash_create(cf_shash_fn_zstr, HASH_STR_SZ,
356 sizeof(char *), 32, 0);
357
358 g_peers = cf_shash_create(cf_nodeid_shash_fn, sizeof(cf_node),
359 sizeof(peer_t *), AS_CLUSTER_SZ, 0);
360
361 cf_queue_init(&g_wake_up, sizeof(uint8_t), 1, true);
362
363 populate_local();
364 recalc();
365
366 as_fabric_register_msg_fn(M_TYPE_INFO, MSG_TEMP, sizeof(MSG_TEMP),
367 SCRATCH_SZ, handle_fabric_message, NULL);
368
369 as_exchange_register_listener(handle_cluster_change, NULL);
370 cf_thread_create_detached(run_update_thread, NULL);
371}
372
373int32_t
374as_service_list_dynamic(char *key, cf_dyn_buf *db)
375{
376 cf_detail(AS_SERVICE_LIST, "handling info value %s", key);
377
378 if (strcmp(key, "services-alumni-reset") == 0) {
379 purge_alumni();
380 cf_dyn_buf_append_string(db, "ok");
381 return 0;
382 }
383
384 print_info_val(key, db);
385 return 0;
386}
387
388int32_t
389as_service_list_command(char *key, char *par, cf_dyn_buf *db)
390{
391 cf_detail(AS_SERVICE_LIST, "handling info command %s %s", key, par);
392
393 uint64_t since = 0;
394
395 // Hack to avoid generic parameter parsing for now, no error checking ...
396 static const char prefix[] = "generation=";
397 static const size_t prefix_len = sizeof(prefix) - 1;
398
399 if (strncmp(par, prefix, prefix_len) == 0) {
400 since = strtoul(par + prefix_len, NULL, 10);
401 }
402
403 // Find the build and projection functions for the given key.
404
405 const peer_val_t *peer_val;
406
407 for (uint32_t i = 0; i < ARRAY_COUNT(PEER_VALS); ++i) {
408 peer_val = PEER_VALS + i;
409
410 if (strcmp(peer_val->key, key) == 0) {
411 break;
412 }
413 }
414
415 // Build the info value directly into the given db, instead of
416 // storing it in g_info as recalc() does.
417
418 const filter_t *val_par = peer_val->filter;
419 cf_mutex_lock(&g_peers_lock);
420
421 peer_val->build(db, peer_val->proj, &(filter_t){
422 .tls = val_par->tls, .present = false, .since = since
423 });
424
425 cf_mutex_unlock(&g_peers_lock);
426 return 0;
427}
428
429
430//==========================================================
431// Local helpers - peer management.
432//
433
434static char **
435proj_serv(peer_t *p)
436{
437 return &p->serv;
438}
439
440static char **
441proj_serv_alt(peer_t *p)
442{
443 return &p->serv_alt;
444}
445
446static char **
447proj_clear_std(peer_t *p)
448{
449 return &p->clear_std;
450}
451
452static char **
453proj_tls_std(peer_t *p)
454{
455 return &p->tls_std;
456}
457
458static char **
459proj_clear_alt(peer_t *p)
460{
461 return &p->clear_alt;
462}
463
464static char **
465proj_tls_alt(peer_t *p)
466{
467 return &p->tls_alt;
468}
469
470static char **
471proj_tls_name(peer_t *p)
472{
473 return &p->tls_name;
474}
475
476static peer_t *
477create_peer(cf_node node)
478{
479 cf_detail(AS_SERVICE_LIST, "new peer %lx", node);
480 peer_t *p = cf_calloc(1, sizeof(peer_t));
481
482 p->present = false;
483
484 p->got_update = false;
485 p->got_ack = false;
486 p->retrans_at_ms = 0;
487
488 p->in_gen = 0;
489
490 p->serv = cf_strdup("");
491 p->serv_alt = cf_strdup("");
492
493 p->clear_std = cf_strdup("");
494 p->tls_std = cf_strdup("");
495 p->clear_alt = cf_strdup("");
496 p->tls_alt = cf_strdup("");
497 p->tls_name = cf_strdup("");
498
499 int32_t res = cf_shash_put_unique(g_peers, &node, &p);
500
501 cf_assert(res == CF_SHASH_OK, AS_SERVICE_LIST,
502 "cf_shash_put_unique() failed: %d", res);
503
504 cf_detail(AS_SERVICE_LIST, "added peer %lx", node);
505 return p;
506}
507
508static peer_t *
509find_peer(cf_node node)
510{
511 cf_detail(AS_SERVICE_LIST, "finding peer %lx", node);
512 peer_t *p;
513 int32_t res = cf_shash_get(g_peers, &node, &p);
514
515 switch (res) {
516 case CF_SHASH_OK:
517 return p;
518
519 case CF_SHASH_ERR_NOT_FOUND:
520 return create_peer(node);
521
522 default:
523 cf_crash(AS_SERVICE_LIST, "cf_shash_get() failed: %d", res);
524 return NULL; // not reached
525 }
526}
527
528static void
529dump_peer(cf_node node, const peer_t *p)
530{
531 cf_detail(AS_SERVICE_LIST, "--------------------- peer change %016lx",
532 node);
533
534 cf_detail(AS_SERVICE_LIST, "present %d", (int32_t)p->present);
535 cf_detail(AS_SERVICE_LIST, "got_update %d", (int32_t)p->got_update);
536 cf_detail(AS_SERVICE_LIST, "got_ack %d", (int32_t)p->got_ack);
537 cf_detail(AS_SERVICE_LIST, "retrans_at_ms %d",
538 (int32_t)(p->retrans_at_ms % 1000000));
539 cf_detail(AS_SERVICE_LIST, "in_gen %lu", p->in_gen);
540 cf_detail(AS_SERVICE_LIST, "serv %s", p->serv);
541 cf_detail(AS_SERVICE_LIST, "serv_alt %s", p->serv_alt);
542 cf_detail(AS_SERVICE_LIST, "clear_std %s", p->clear_std);
543 cf_detail(AS_SERVICE_LIST, "tls_std %s", p->tls_std);
544 cf_detail(AS_SERVICE_LIST, "clear_alt %s", p->clear_alt);
545 cf_detail(AS_SERVICE_LIST, "tls_alt %s", p->tls_alt);
546 cf_detail(AS_SERVICE_LIST, "tls_name %s", p->tls_name);
547}
548
549static void
550set_present(const cf_node *nodes, uint32_t n_nodes, bool present)
551{
552 for (uint32_t i = 0; i < n_nodes; ++i) {
553 peer_t *p = find_peer(nodes[i]);
554
555 p->present = present;
556
557 if (present && p->got_update && p->got_ack) {
558 p->retrans_at_ms = 0;
559 }
560
561 p->in_gen = ++g_in_gen;
562
563 dump_peer(nodes[i], p);
564 }
565}
566
567static int32_t
568purge_alumni_reduce(const void *key, void *data, void *udata)
569{
570 cf_node node = *(const cf_node *)key;
571 peer_t *p = *(peer_t **)data;
572 (void)udata;
573
574 cf_detail(AS_SERVICE_LIST, "visiting node %lx", node);
575
576 if (p->present) {
577 cf_detail(AS_SERVICE_LIST, "node present");
578 return CF_SHASH_OK;
579 }
580
581 cf_detail(AS_SERVICE_LIST, "deleting alumnus");
582 ++g_in_gen;
583 return CF_SHASH_REDUCE_DELETE;
584}
585
586static void
587purge_alumni(void)
588{
589 cf_detail(AS_SERVICE_LIST, "purging alumni");
590 cf_mutex_lock(&g_peers_lock);
591
592 uint64_t old_gen = g_in_gen;
593
594 cf_shash_reduce(g_peers, purge_alumni_reduce, NULL);
595
596 if (g_in_gen > old_gen) {
597 recalc();
598 }
599
600 cf_mutex_unlock(&g_peers_lock);
601}
602
603// handle_cluster_change() is authoritative for who's currently a peer and
604// who's just an alumnus, i.e., a former peer. It does two things:
605//
606// 1. It adds previously unknown nodes to g_peers.
607//
608// 2. It manages the peer_t::present field: true = peer, false = alumnus.
609//
610// The other peer_t fields are managed by handle_fabric_message().
611
612static void
613handle_cluster_change(const as_exchange_cluster_changed_event *ev, void *udata)
614{
615 cf_detail(AS_SERVICE_LIST, "------------------ cluster change %016lx",
616 ev->cluster_key);
617 (void)udata;
618
619 // The previous succession list.
620
621 static cf_node suc_old[AS_CLUSTER_SZ];
622 static uint32_t sz_old = 0;
623
624 // Remove ourselves from the new succession list.
625
626 cf_node suc_new[AS_CLUSTER_SZ];
627 uint32_t sz_new = 0;
628
629 for (uint32_t i = 0; i < ev->cluster_size; ++i) {
630 if (ev->succession[i] == g_config.self_node) {
631 continue;
632 }
633
634 suc_new[sz_new] = ev->succession[i];
635 ++sz_new;
636 }
637
638 as_clustering_log_cf_node_array(CF_DETAIL, AS_SERVICE_LIST, "new peers",
639 suc_new, (int32_t)sz_new);
640 as_clustering_log_cf_node_array(CF_DETAIL, AS_SERVICE_LIST, "old peers",
641 suc_old, (int32_t)sz_old);
642
643 cf_node add[AS_CLUSTER_SZ];
644 uint32_t n_add = 0;
645
646 cf_node rem[AS_CLUSTER_SZ];
647 uint32_t n_rem = 0;
648
649 uint32_t i_old = 0;
650 uint32_t i_new = 0;
651
652 // Calculate the differences between the old and the new peers.
653
654 // This assumes that a succession list contains the node IDs in
655 // descending order.
656
657 while (i_old < sz_old || i_new < sz_new) {
658 cf_node node_old = i_old < sz_old ? suc_old[i_old] : 0;
659 cf_node node_new = i_new < sz_new ? suc_new[i_new] : 0;
660
661 // Old succession list skipped ahead of new succession list.
662 // (Or we hit the end of the old succession list.)
663
664 if (node_old < node_new) {
665 add[n_add] = node_new;
666 ++n_add;
667 ++i_new;
668 continue;
669 }
670
671 // New succession list skipped ahead of old succession list.
672 // (Or we hit the end of the new succession list.)
673
674 if (node_new < node_old) {
675 rem[n_rem] = node_old;
676 ++n_rem;
677 ++i_old;
678 continue;
679 }
680
681 ++i_old;
682 ++i_new;
683 }
684
685 as_clustering_log_cf_node_array(CF_DETAIL, AS_SERVICE_LIST, "peers add",
686 add, (int32_t)n_add);
687 as_clustering_log_cf_node_array(CF_DETAIL, AS_SERVICE_LIST, "peers rem",
688 rem, (int32_t)n_rem);
689
690 if (n_add + n_rem > 0) {
691 cf_mutex_lock(&g_peers_lock);
692
693 set_present(add, n_add, true);
694 set_present(rem, n_rem, false);
695 recalc();
696
697 cf_mutex_unlock(&g_peers_lock);
698 wake_up();
699 }
700
701 // Next time, new succession list will be the old succession list.
702
703 for (uint32_t i = 0; i < sz_new; ++i) {
704 suc_old[i] = suc_new[i];
705 }
706
707 sz_old = sz_new;
708}
709
710// handle_fabric_message() manages what we know about another node. It does
711// two things:
712//
713// 1. It adds previously unknown nodes to g_peers.
714//
715// 2. It manages all peer_t fields, except peer_t::present. It doesn't care
716// whether a node is currently a peer (present = true) or just an alumnus
717// (present = false). It blindly updates the peer_t fields, always.
718//
719// peer_t::present is managed by handle_cluster_change().
720
721static int32_t
722handle_fabric_message(cf_node node, msg *m, void *udata)
723{
724 (void)udata;
725 cf_detail(AS_SERVICE_LIST, "------------------ fabric message %016lx",
726 node);
727
728 // Get operation and generation.
729
730 uint32_t op;
731
732 if (msg_get_uint32(m, FIELD_OP, &op) < 0) {
733 cf_warning(AS_SERVICE_LIST, "op-less service message from node %lx",
734 node);
735 as_fabric_msg_put(m);
736 return 0;
737 }
738
739 uint32_t gen;
740
741 if (msg_get_uint32(m, FIELD_GEN, &gen) < 0) {
742 cf_warning(AS_SERVICE_LIST, "gen-less service message from node %lx",
743 node);
744 as_fabric_msg_put(m);
745 return 0;
746 }
747
748 cf_detail(AS_SERVICE_LIST, "op %s gen %u", op_str(op), gen);
749
750 cf_mutex_lock(&g_peers_lock);
751
752 peer_t *p = find_peer(node);
753 bool change = false;
754
755 if (op == OP_ACK) {
756 cf_detail(AS_SERVICE_LIST, "OP_ACK %u from %lx", gen, node);
757
758 // Set peer_t::ack.
759
760 change = change || !p->got_ack;
761 p->got_ack = true;
762
763 if (p->present && p->got_update) {
764 change = change || p->retrans_at_ms != 0;
765 p->retrans_at_ms = 0;
766 }
767
768 if (change) {
769 dump_peer(node, p);
770 }
771
772 cf_mutex_unlock(&g_peers_lock);
773 as_fabric_msg_put(m);
774
775 return 0;
776 }
777
778 if (op != OP_UPDATE && op != OP_UPDATE_REQ) {
779 cf_warning(AS_SERVICE_LIST, "invalid service list op %d from node %lx",
780 op, node);
781
782 cf_mutex_unlock(&g_peers_lock);
783 as_fabric_msg_put(m);
784
785 return 0;
786 }
787
788 if (op == OP_UPDATE) {
789 cf_detail(AS_SERVICE_LIST, "OP_UPDATE from %lx", node);
790 }
791 else {
792 cf_detail(AS_SERVICE_LIST, "OP_UPDATE_REQ from %lx", node);
793
794 // Clear peer_t::ack.
795
796 change = change || p->got_ack;
797 p->got_ack = false;
798
799 wake_up();
800 }
801
802 // Set peer_t::update.
803
804 change = change || !p->got_update;
805 p->got_update = true;
806
807 if (p->present && p->got_ack) {
808 change = change || p->retrans_at_ms != 0;
809 p->retrans_at_ms = 0;
810 }
811
812 // Populate peer_t from message fields.
813
814 for (uint32_t i = 0; i < ARRAY_COUNT(FIELD_PROJS); ++i) {
815 char **to = FIELD_PROJS[i].proj(p);
816 char *old = *to;
817
818 // We follow the convention of the old code, which omits
819 // empty fields from the fabric message. So let's be prepared
820 // for missing fields!
821
822 if (msg_get_str(m, FIELD_PROJS[i].field, to, MSG_GET_COPY_MALLOC) < 0) {
823 *to = cf_strdup("");
824 }
825
826 change = change || strcmp(*to, old) != 0;
827 cf_free(old);
828 }
829
830 if (change) {
831 p->in_gen = ++g_in_gen;
832 dump_peer(node, p);
833 recalc();
834 }
835
836 // Send ACK.
837
838 cf_detail(AS_SERVICE_LIST, "sending OP_ACK to %lx", node);
839
840 msg_preserve_fields(m, 1, FIELD_GEN);
841 msg_set_uint32(m, FIELD_OP, OP_ACK);
842
843 int32_t res = as_fabric_send(node, m, AS_FABRIC_CHANNEL_CTRL);
844
845 if (res != AS_FABRIC_SUCCESS) {
846 cf_warning(AS_SERVICE_LIST, "error while sending OP_ACK to %lx: %d",
847 node, res);
848 cf_mutex_unlock(&g_peers_lock);
849 as_fabric_msg_put(m);
850 return 0;
851 }
852
853 cf_mutex_unlock(&g_peers_lock);
854
855 // No as_fabric_msg_put(), since we reused the original message to
856 // send the ACK.
857
858 return 0;
859}
860
861static int32_t
862update_reduce(const void *key, void *data, void *udata)
863{
864 cf_node node = *(const cf_node *)key;
865 peer_t *p = *(peer_t **)data;
866 update_ctx_t *ctx = udata;
867
868 cf_detail(AS_SERVICE_LIST,
869 "updating %lx - present %d got_update %d got_ack %d", node,
870 (int32_t)p->present, (int32_t)p->got_update, (int32_t)p->got_ack);
871
872 // If it's an alumnus, don't update.
873
874 if (!p->present) {
875 cf_detail(AS_SERVICE_LIST, "skipping alumnus");
876 return CF_SHASH_OK;
877 }
878
879 // If we don't need anything from the peer and the peer doesn't need
880 // anything from us (i.e., it acknowledged), we're done.
881
882 if (p->got_update && p->got_ack) {
883 cf_detail(AS_SERVICE_LIST, "nothing to be done");
884 return CF_SHASH_OK;
885 }
886
887 // If it's not yet time to transmit, then don't. Also calculate
888 // ctx->retrans_at_ms as the minimum across all peers.
889
890 if (p->retrans_at_ms != 0) {
891 cf_detail(AS_SERVICE_LIST, "retrans_at_ms %d now_ms %d",
892 (int32_t)(p->retrans_at_ms % 1000000),
893 (int32_t)(ctx->now_ms % 1000000));
894
895 if (ctx->now_ms < p->retrans_at_ms) {
896 cf_detail(AS_SERVICE_LIST, "not yet");
897
898 if (ctx->retrans_at_ms == 0 ||
899 p->retrans_at_ms < ctx->retrans_at_ms) {
900 ctx->retrans_at_ms = p->retrans_at_ms;
901 }
902
903 return CF_SHASH_OK;
904 }
905 }
906 else {
907 cf_detail(AS_SERVICE_LIST, "no retrans");
908 }
909
910 // If we never got an update from a peer, request one from the peer
911 // (OP_UPDATE_REQ); otherwise don't (OP_UPDATE).
912
913 // This is handy after a restart. The peers won't notice that we restarted
914 // and won't send us an update. So, we ask them by sending OP_UPDATE_REQ.
915
916 uint32_t op = p->got_update ? OP_UPDATE : OP_UPDATE_REQ;
917
918 // Compose outgoing message.
919
920 msg *m = as_fabric_msg_get(M_TYPE_INFO);
921 msg_set_uint32(m, FIELD_OP, op);
922
923 // We don't support dynamically changing interface configurations any
924 // longer, so we'll only ever have generation 1.
925
926 msg_set_uint32(m, FIELD_GEN, 1);
927
928 // Populate fields from g_local.
929
930 for (uint32_t i = 0; i < ARRAY_COUNT(FIELD_PROJS); ++i) {
931 char **from = FIELD_PROJS[i].proj(&g_local);
932
933 // We follow the convention of the old code, which omits empty fields
934 // from the fabric message.
935
936 if ((*from)[0] != 0) {
937 msg_set_str(m, FIELD_PROJS[i].field, *from, MSG_SET_COPY);
938 }
939 }
940
941 // Send fabric message.
942
943 cf_detail(AS_SERVICE_LIST, "sending %s to %lx", op_str(op), node);
944
945 int32_t res = as_fabric_send(node, m, AS_FABRIC_CHANNEL_CTRL);
946
947 if (res == AS_FABRIC_ERR_NO_NODE) {
948 cf_detail(AS_SERVICE_LIST, "unknown node %lx", node);
949 as_fabric_msg_put(m);
950 }
951 else if (res != AS_FABRIC_SUCCESS) {
952 cf_warning(AS_SERVICE_LIST, "error while sending %s to %lx: %d",
953 op_str(op), node, res);
954 as_fabric_msg_put(m);
955 }
956
957 p->retrans_at_ms = ctx->now_ms + RETRANS_INTERVAL_MS;
958
959 if (ctx->retrans_at_ms == 0 || p->retrans_at_ms < ctx->retrans_at_ms) {
960 ctx->retrans_at_ms = p->retrans_at_ms;
961 }
962
963 cf_detail(AS_SERVICE_LIST, "retrans_at_ms %d now_ms %d",
964 (int32_t)(p->retrans_at_ms % 1000000),
965 (int32_t)(ctx->now_ms % 1000000));
966
967 return CF_SHASH_OK;
968}
969
970static void *
971run_update_thread(void *udata)
972{
973 (void)udata;
974
975 while (true) {
976 update_ctx_t ctx = {
977 .now_ms = cf_getms(), .retrans_at_ms = 0
978 };
979
980 cf_mutex_lock(&g_peers_lock);
981
982 cf_detail(AS_SERVICE_LIST,
983 "----------------------------------------- updating");
984 cf_shash_reduce(g_peers, update_reduce, &ctx);
985
986 cf_mutex_unlock(&g_peers_lock);
987
988 int32_t wait;
989
990 if (ctx.retrans_at_ms == 0) {
991 wait = CF_QUEUE_FOREVER;
992 cf_detail(AS_SERVICE_LIST, "sleeping forever");
993 }
994 else {
995 wait = (int32_t)(ctx.retrans_at_ms - ctx.now_ms);
996 cf_detail(AS_SERVICE_LIST, "sleeping %d ms", wait);
997 }
998
999 uint8_t dummy;
1000 cf_queue_pop(&g_wake_up, &dummy, wait);
1001 }
1002
1003 return NULL; // not reached
1004}
1005
1006static void
1007wake_up(void)
1008{
1009 cf_detail(AS_SERVICE_LIST, "waking up update thread");
1010
1011 static uint8_t dummy = 0;
1012 cf_queue_push(&g_wake_up, &dummy);
1013}
1014
1015
1016//==========================================================
1017// Local helpers - local node information.
1018//
1019
1020static char *
1021print_list(const as_service_endpoint *endp, uint32_t limit, char sep,
1022 bool legacy)
1023{
1024 cf_detail(AS_SERVICE_LIST, "printing list - count %u port %hu limit %u "
1025 "legacy %d", endp->addrs.n_addrs, endp->port, limit,
1026 (int32_t)legacy);
1027
1028 if (endp->port == 0) {
1029 cf_detail(AS_SERVICE_LIST, "service inactive");
1030 return NULL;
1031 }
1032
1033 legacy = legacy || cf_ip_addr_legacy_only();
1034
1035 cf_dyn_buf_define(db);
1036 uint32_t n_out = 0;
1037
1038 for (uint32_t i = 0; i < endp->addrs.n_addrs &&
1039 (limit == 0 || n_out < limit); ++i) {
1040 cf_detail(AS_SERVICE_LIST, "adding %s", endp->addrs.addrs[i]);
1041
1042 if (legacy && !cf_ip_addr_str_is_legacy(endp->addrs.addrs[i])) {
1043 cf_detail(AS_SERVICE_LIST, "skipping non-legacy");
1044 continue;
1045 }
1046
1047 if (n_out > 0) {
1048 cf_dyn_buf_append_char(&db, sep);
1049 }
1050
1051 if (cf_ip_addr_is_dns_name(endp->addrs.addrs[i])) {
1052 cf_dyn_buf_append_string(&db, endp->addrs.addrs[i]);
1053 cf_dyn_buf_append_char(&db, ':');
1054 cf_dyn_buf_append_string(&db, cf_ip_port_print(endp->port));
1055 }
1056 else {
1057 cf_sock_addr addr;
1058 CF_NEVER_FAILS(cf_sock_addr_from_host_port(endp->addrs.addrs[i],
1059 endp->port, &addr));
1060 cf_dyn_buf_append_string(&db, cf_sock_addr_print(&addr));
1061 }
1062
1063 ++n_out;
1064 }
1065
1066 char *str = n_out > 0 ? cf_dyn_buf_strdup(&db) : NULL;
1067
1068 cf_dyn_buf_free(&db);
1069 return str;
1070}
1071
1072static void
1073enum_addrs(cf_addr_list *addrs)
1074{
1075 cf_ip_addr bin_addrs[CF_SOCK_CFG_MAX];
1076 uint32_t n_bin_addrs = CF_SOCK_CFG_MAX;
1077
1078 if (cf_inter_get_addr_all(bin_addrs, &n_bin_addrs) < 0) {
1079 cf_crash(AS_SERVICE_LIST, "address enumeration failed");
1080 }
1081
1082 addrs->n_addrs = 0;
1083
1084 for (uint32_t i = 0; i < n_bin_addrs; ++i) {
1085 if (cf_ip_addr_is_local(bin_addrs + i)) {
1086 continue;
1087 }
1088
1089 char addr_str[250];
1090 cf_ip_addr_to_string_safe(bin_addrs + i, addr_str, sizeof(addr_str));
1091
1092 addrs->addrs[addrs->n_addrs] = cf_strdup(addr_str);
1093 ++addrs->n_addrs;
1094 }
1095}
1096
1097static void
1098free_addrs(cf_addr_list *addrs)
1099{
1100 for (uint32_t i = 0; i < addrs->n_addrs; ++i) {
1101 cf_free((char *)addrs->addrs[i]);
1102 }
1103
1104 addrs->n_addrs = 0;
1105}
1106
1107static void
1108populate_local(void)
1109{
1110 cf_detail(AS_SERVICE_LIST, "populating local info");
1111
1112 // Populate from access addresses.
1113
1114 g_local.serv = print_list(&g_access.service, 0, ';', true);
1115 g_local.serv_alt = print_list(&g_access.alt_service, 1, ';', true);
1116
1117 g_local.clear_std = print_list(&g_access.service, 0, ',', false);
1118 g_local.tls_std = print_list(&g_access.tls_service, 0, ',', false);
1119 g_local.clear_alt = print_list(&g_access.alt_service, 0, ',', false);
1120 g_local.tls_alt = print_list(&g_access.alt_tls_service, 0, ',', false);
1121
1122 // Alternate lists default to no addresses.
1123
1124 if (g_local.serv_alt == NULL) {
1125 g_local.serv_alt = "";
1126 }
1127
1128 if (g_local.clear_alt == NULL) {
1129 g_local.clear_alt = "";
1130 }
1131
1132 if (g_local.tls_alt == NULL) {
1133 g_local.tls_alt = "";
1134 }
1135
1136 // Standard lists default to all interface addresses.
1137
1138 as_service_endpoint endp;
1139 enum_addrs(&endp.addrs);
1140
1141 // Don't test g_local.serv, which is also NULL in IPv6-only setups.
1142 if (g_local.clear_std == NULL) {
1143 endp.port = g_access.service.port;
1144 g_local.serv = print_list(&endp, 0, ';', true);
1145 }
1146
1147 if (g_local.clear_std == NULL) {
1148 endp.port = g_access.service.port;
1149 g_local.clear_std = print_list(&endp, 0, ',', false);
1150 }
1151
1152 if (g_local.tls_std == NULL) {
1153 endp.port = g_access.tls_service.port;
1154 g_local.tls_std = print_list(&endp, 0, ',', false);
1155 }
1156
1157 free_addrs(&endp.addrs);
1158
1159 // Take care of unused (port == 0) standard lists, which are
1160 // still NULL at this point.
1161
1162 if (g_local.serv == NULL) {
1163 g_local.serv = "";
1164 }
1165
1166 if (g_local.clear_std == NULL) {
1167 g_local.clear_std = "";
1168 }
1169
1170 if (g_local.tls_std == NULL) {
1171 g_local.tls_std = "";
1172 }
1173
1174 // Finally, the TLS name.
1175
1176 g_local.tls_name = g_config.tls_service.tls_our_name;
1177
1178 if (g_local.tls_name == NULL) {
1179 g_local.tls_name = "";
1180 }
1181
1182 // Populate info values from g_local.
1183
1184 cf_detail(AS_SERVICE_LIST, "populating info values");
1185
1186 for (uint32_t i = 0; i < ARRAY_COUNT(LOCAL_VALS); ++i) {
1187 const char *key = LOCAL_VALS[i].key;
1188 const char *val = *(LOCAL_VALS[i].proj(&g_local));
1189 set_info_val(key, val);
1190 }
1191
1192 dump_peer(0, &g_local);
1193}
1194
1195
1196//==========================================================
1197// Local helpers - info value management.
1198//
1199
1200static void
1201set_info_val(const char *key, const char *val)
1202{
1203 cf_detail(AS_SERVICE_LIST, "info val %s <- %s", key, val);
1204
1205 char hash_str[HASH_STR_SZ];
1206 strncpy(hash_str, key, HASH_STR_SZ); // pads with \0
1207
1208 // Remove existing value.
1209
1210 char *val_old;
1211 int32_t res = cf_shash_get_and_delete(g_info, hash_str, &val_old);
1212
1213 switch (res) {
1214 case CF_SHASH_OK:
1215 cf_free(val_old);
1216 break;
1217 case CF_SHASH_ERR_NOT_FOUND:
1218 break;
1219 default:
1220 cf_crash(AS_SERVICE_LIST, "cf_shash_get_and_delete() failed: %d", res);
1221 break;
1222 }
1223
1224 // Set new value.
1225
1226 char *val_dup = cf_strdup(val);
1227 res = cf_shash_put_unique(g_info, hash_str, &val_dup);
1228
1229 cf_assert(res == CF_SHASH_OK, AS_SERVICE_LIST,
1230 "cf_shash_put_unique() failed: %d", res);
1231}
1232
1233static void
1234print_info_val(const char *key, cf_dyn_buf *db)
1235{
1236 pthread_rwlock_rdlock(&g_info_lock);
1237
1238 char hash_str[HASH_STR_SZ];
1239 strncpy(hash_str, key, HASH_STR_SZ); // pads with \0
1240
1241 char *val;
1242 int32_t res = cf_shash_get(g_info, hash_str, &val);
1243
1244 cf_assert(res == CF_SHASH_OK, AS_SERVICE_LIST, "cf_shash_get() failed: %d",
1245 res);
1246
1247 cf_dyn_buf_append_string(db, val);
1248 cf_detail(AS_SERVICE_LIST, "info val %s -> %s", key, val);
1249
1250 pthread_rwlock_unlock(&g_info_lock);
1251}
1252
1253static int32_t
1254build_peers_reduce(const void *key, void *data, void *udata)
1255{
1256 cf_node node = *(const cf_node *)key;
1257 peer_t *p = *(peer_t **)data;
1258 print_par_t *par = udata;
1259
1260 cf_detail(AS_SERVICE_LIST, "visiting node %lx", node);
1261
1262 // Skip alumnus, if alumni excluded.
1263
1264 if (par->present && !p->present) {
1265 cf_detail(AS_SERVICE_LIST, "node absent");
1266 return CF_SHASH_OK;
1267 }
1268
1269 // Skip, if unchanged since the given delta cut-off generation.
1270
1271 if (p->in_gen <= par->since) {
1272 cf_detail(AS_SERVICE_LIST, "no recent change");
1273 return CF_SHASH_OK;
1274 }
1275
1276 // Read selected field from peer_t.
1277
1278 const char *field = *(par->proj(p));
1279
1280 if (field[0] == 0) {
1281 cf_detail(AS_SERVICE_LIST, "field empty");
1282 return CF_SHASH_OK;
1283 }
1284
1285 // Append field value.
1286
1287 cf_detail(AS_SERVICE_LIST, "adding %s", field);
1288 cf_dyn_buf *db = par->db;
1289
1290 if (par->count > 0) {
1291 cf_dyn_buf_append_char(db, ',');
1292 }
1293
1294 char node_str[17];
1295 cf_str_itoa_u64(node, node_str, 16);
1296
1297 cf_dyn_buf_append_char(db, '[');
1298 cf_dyn_buf_append_string(db, node_str);
1299 cf_dyn_buf_append_char(db, ',');
1300
1301 // (a) Typical case. Not a delta query, or a delta query but
1302 // not an alumnus. Report normally, i.e., as
1303 //
1304 // [<node-id>,<tls-name>,[addr-port-1, addr-port-2, ...]]
1305
1306 if (par->since == 0 || p->present) {
1307 if (par->tls) {
1308 cf_dyn_buf_append_string(db, p->tls_name);
1309 }
1310
1311 cf_dyn_buf_append_char(db, ',');
1312 cf_dyn_buf_append_char(db, '[');
1313
1314 char buff[strlen(field) + 1];
1315 char *pref = strip_suff(field, par->strip, buff);
1316 cf_detail(AS_SERVICE_LIST, "stripped %s", pref);
1317 cf_dyn_buf_append_string(db, pref);
1318
1319 cf_dyn_buf_append_char(db, ']');
1320 }
1321
1322 // (b) Delta query and an alumnus. Include alumnus as
1323 //
1324 // [<node-id>,,]
1325 //
1326 // in response to indicate that the node with ID <node-id>
1327 // (= the alumnus) went away.
1328
1329 else {
1330 cf_dyn_buf_append_char(db, ',');
1331 }
1332
1333 cf_dyn_buf_append_char(db, ']');
1334
1335 ++par->count;
1336 return CF_SHASH_OK;
1337}
1338
1339static void
1340build_peers(cf_dyn_buf *db, proj_t proj, const filter_t *filter)
1341{
1342 cf_dyn_buf_append_uint64(db, g_in_gen);
1343 cf_dyn_buf_append_char(db, ',');
1344
1345 cf_dyn_buf_append_string(db, DEFAULT_PORT);
1346 cf_dyn_buf_append_char(db, ',');
1347
1348 cf_dyn_buf_append_char(db, '[');
1349
1350 print_par_t print_par = {
1351 .db = db,
1352 .proj = proj,
1353 .strip = ":" DEFAULT_PORT,
1354 .tls = filter->tls,
1355 .present = filter->present,
1356 .since = filter->since,
1357 .count = 0
1358 };
1359
1360 cf_shash_reduce(g_peers, build_peers_reduce, &print_par);
1361
1362 cf_dyn_buf_append_char(db, ']');
1363}
1364
1365static int32_t
1366build_services_reduce(const void *key, void *data, void *udata)
1367{
1368 cf_node node = *(const cf_node *)key;
1369 peer_t *p = *(peer_t **)data;
1370 print_par_t *par = udata;
1371
1372 cf_detail(AS_SERVICE_LIST, "visiting node %lx", node);
1373
1374 // Skip alumnus, if alumni excluded.
1375
1376 if (par->present && !p->present) {
1377 cf_detail(AS_SERVICE_LIST, "node absent");
1378 return CF_SHASH_OK;
1379 }
1380
1381 // Read selected field from peer_t.
1382
1383 const char *field = *(par->proj(p));
1384
1385 if (field[0] == 0) {
1386 cf_detail(AS_SERVICE_LIST, "field empty");
1387 return CF_SHASH_OK;
1388 }
1389
1390 // Append field value.
1391
1392 cf_detail(AS_SERVICE_LIST, "adding %s", field);
1393 cf_dyn_buf *db = par->db;
1394
1395 if (par->count > 0) {
1396 cf_dyn_buf_append_char(db, ';');
1397 }
1398
1399 cf_dyn_buf_append_string(db, field);
1400 ++par->count;
1401
1402 return CF_SHASH_OK;
1403}
1404
1405static void
1406build_services(cf_dyn_buf *db, proj_t proj, const filter_t *filter)
1407{
1408 print_par_t print_par = {
1409 .db = db,
1410 .proj = proj,
1411 .strip = NULL,
1412 .tls = false,
1413 .present = filter->present,
1414 .since = 0,
1415 .count = 0
1416 };
1417
1418 cf_shash_reduce(g_peers, build_services_reduce, &print_par);
1419}
1420
1421static void
1422build_gen(cf_dyn_buf *db, proj_t proj, const filter_t *filter)
1423{
1424 (void)proj;
1425 (void)filter;
1426
1427 cf_dyn_buf_append_uint64(db, g_in_gen);
1428}
1429
1430static void
1431recalc(void)
1432{
1433 cf_detail(AS_SERVICE_LIST, "recalculating info values");
1434 pthread_rwlock_wrlock(&g_info_lock);
1435
1436 // Loop through all info values.
1437
1438 for (uint32_t i = 0; i < ARRAY_COUNT(PEER_VALS); ++i) {
1439 const peer_val_t *peer_val = PEER_VALS + i;
1440
1441 // Skip, if info value isn't refreshable.
1442
1443 if (peer_val->build == NULL) {
1444 continue;
1445 }
1446
1447 cf_detail(AS_SERVICE_LIST, "recalculating %s", peer_val->key);
1448
1449 // Build the info value into a temporary db.
1450
1451 cf_dyn_buf_define(db);
1452
1453 peer_val->build(&db, peer_val->proj, peer_val->filter);
1454 char *val = cf_dyn_buf_strdup(&db);
1455
1456 cf_dyn_buf_free(&db);
1457
1458 // Store the info value in g_info.
1459
1460 if (val == NULL) {
1461 set_info_val(peer_val->key, "");
1462 }
1463 else {
1464 set_info_val(peer_val->key, val);
1465 cf_free(val);
1466 }
1467 }
1468
1469 pthread_rwlock_unlock(&g_info_lock);
1470}
1471
1472
1473//==========================================================
1474// Local helpers - miscellaneous.
1475//
1476
1477static const char *
1478op_str(uint32_t op)
1479{
1480 switch (op) {
1481 case OP_UPDATE:
1482 return "OP_UPDATE";
1483 case OP_ACK:
1484 return "OP_ACK";
1485 case OP_UPDATE_REQ:
1486 return "OP_UPDATE_REQ";
1487 default:
1488 return "OP_???";
1489 }
1490}
1491
1492// Strip a given suffix off the elements of a comma-separated list. With suffix
1493// ":xxx", for example, the following would happen:
1494//
1495// aaa:xxx,bbb:yyy,ccc:xxx,ddd:xxx -> aaa,bbb:yyy,ccc,ddd
1496//
1497// Used to strip the default port off "host:port" lists.
1498
1499static char *
1500strip_suff(const char *in, const char *suff, char *out)
1501{
1502 size_t in_len = strlen(in);
1503 size_t suff_len = strlen(suff);
1504
1505 size_t i_in = in_len;
1506 size_t i_out = in_len;
1507
1508 out[i_out] = 0;
1509
1510 while (i_in >= suff_len) {
1511 if (memcmp(in + i_in - suff_len, suff, suff_len) == 0) {
1512 i_in -= suff_len;
1513 }
1514
1515 while (i_in > 0) {
1516 out[--i_out] = in[--i_in];
1517
1518 if (in[i_in] == ',') {
1519 break;
1520 }
1521 }
1522 }
1523
1524 return out + i_out;
1525}
1526