1/*
2 * exchange.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#include "fabric/exchange.h"
24
25#include <errno.h>
26#include <pthread.h>
27#include <unistd.h>
28#include <sys/param.h> // For MAX() and MIN().
29
30#include "citrusleaf/alloc.h"
31#include "citrusleaf/cf_atomic.h"
32#include "citrusleaf/cf_clock.h"
33#include "citrusleaf/cf_queue.h"
34
35#include "cf_thread.h"
36#include "dynbuf.h"
37#include "fault.h"
38#include "shash.h"
39#include "socket.h"
40
41#include "base/cfg.h"
42#include "base/datamodel.h"
43#include "base/stats.h"
44#include "fabric/fabric.h"
45#include "fabric/hb.h"
46#include "fabric/partition_balance.h"
47#include "storage/storage.h"
48
49/*
50 * Overview
51 * ========
52 * Cluster data exchange state machine. Exchanges per namespace partition
53 * version exchange for now, after evey cluster change.
54 *
55 * State transition diagram
56 * ========================
57 * The exchange state transition diagram responds to three events
58 * 1. Incoming message
59 * 2. Timer event
60 * 3. Clustering module's cluster change event.
61 *
62 * There are four states
63 * 1. Rest - the exchange is complete with all exchanged data committed.
64 * 2. Exchanging - the cluster has changed since the last commit and new data
65 * exchange is in progress.
66 * 3. Ready to commit - this node has send its exchange data to all cluster
67 * members, received corresponding acks and also exchange data from all cluster
68 * members.
69 * 4. Orphaned - this node is an orphan. After a timeout blocks client
70 * transactions.
71 *
72 * Exchange starts by being in the orphaned state.
73 *
74 * Code organization
75 * =================
76 *
77 * There are different sections for each state. Each state has a dispatcher
78 * which delegates the event handing to a state specific function.
79 *
80 * Locks
81 * =====
82 * 1. g_exchanage_lock - protected the exchange state machine.
83 * 2. g_exchange_info_lock - prevents update of exchanged data for a round while
84 * exchange is in progress
85 * 3. g_exchange_commited_cluster_lock - a higher
86 * granularity, committed cluster r/w lock,used to allow read access to
87 * committed cluster data even while exchange is busy and holding on to the
88 * global exchange state machine lock.
89 * 4. g_external_event_publisher_lock - ensure order of external events
90 * published is maintained.
91 *
92 * Lock order
93 * ==========
94 * Locks to be obtained in the order mentioned above and relinquished in the
95 * reverse order.
96 */
97
98/*
99 * ----------------------------------------------------------------------------
100 * Constants
101 * ----------------------------------------------------------------------------
102 */
103
104/**
105 * Exchange protocol version information.
106 */
107#define AS_EXCHANGE_PROTOCOL_IDENTIFIER 1
108
109/**
110 * A soft limit for the maximum cluster size. Meant to be optimize hash and list
111 * data structures and not as a limit on the number of nodes.
112 */
113#define AS_EXCHANGE_CLUSTER_MAX_SIZE_SOFT 200
114
115/**
116 * A soft limit for the maximum number of unique vinfo's in a namespace. Meant
117 * to be optimize hash and list data structures and not as a limit on the number
118 * of vinfos processed.
119 */
120#define AS_EXCHANGE_UNIQUE_VINFO_MAX_SIZE_SOFT 200
121
122/**
123 * Average number of partitions for a version information. Used as initial
124 * allocation size for every unique vinfo, hence a smaller value.
125 */
126#define AS_EXCHANGE_VINFO_NUM_PIDS_AVG 1024
127
128/**
129 * Maximum event listeners.
130 */
131#define AS_EXTERNAL_EVENT_LISTENER_MAX 7
132
133/*
134 * ----------------------------------------------------------------------------
135 * Exchange data format for namespaces payload
136 * ----------------------------------------------------------------------------
137 */
138
139/**
140 * Partition data exchanged for each unique vinfo for a namespace.
141 */
142typedef struct as_exchange_vinfo_payload_s
143{
144 /**
145 * The partition vinfo.
146 */
147 as_partition_version vinfo;
148
149 /**
150 * Count of partitions having this vinfo.
151 */
152 uint32_t num_pids;
153
154 /**
155 * Partition having this vinfo.
156 */
157 uint16_t pids[];
158}__attribute__((__packed__)) as_exchange_vinfo_payload;
159
160/**
161 * Information exchanged for a single namespace.
162 */
163typedef struct as_exchange_ns_vinfos_payload_s
164{
165 /**
166 * Count of version infos.
167 */
168 uint32_t num_vinfos;
169
170 /**
171 * Parition version information for each unique version.
172 */
173 as_exchange_vinfo_payload vinfos[];
174}__attribute__((__packed__)) as_exchange_ns_vinfos_payload;
175
176/**
177 * Received data stored per node, per namespace, before actual commit.
178 */
179typedef struct as_exchange_node_namespace_data_s
180{
181 /**
182 * Mapped local namespace.
183 */
184 as_namespace* local_namespace;
185
186 /**
187 * Partition versions for this namespace. This field is reused across
188 * exchange rounds and may not be null even if the local namespace is null.
189 */
190 as_exchange_ns_vinfos_payload* partition_versions;
191
192 /**
193 * Sender's rack id.
194 */
195 uint32_t rack_id;
196
197 /**
198 * Sender's roster generation.
199 */
200 uint32_t roster_generation;
201
202 /**
203 * Sender's roster count.
204 */
205 uint32_t roster_count;
206
207 /**
208 * Sending node's roster for this namespace.
209 */
210 cf_node* roster;
211
212 /**
213 * Sending node's roster rack-ids for this namespace.
214 */
215 cf_node* roster_rack_ids;
216
217 /**
218 * Sender's eventual regime for this namespace.
219 */
220 uint32_t eventual_regime;
221
222 /**
223 * Sender's rebalance regime for this namespace.
224 */
225 uint32_t rebalance_regime;
226
227 /**
228 * Sender's rebalance flags for this namespace.
229 */
230 uint32_t rebalance_flags;
231} as_exchange_node_namespace_data;
232
233/**
234 * Exchanged data for a single node.
235 */
236typedef struct as_exchange_node_data_s
237{
238 /**
239 * Used by exchange listeners during upgrades for compatibility purposes.
240 */
241 uint32_t compatibility_id;
242
243 /**
244 * Number of sender's namespaces that have a matching local namespace.
245 */
246 uint32_t num_namespaces;
247
248 /**
249 * Data for sender's namespaces having a matching local namespace.
250 */
251 as_exchange_node_namespace_data namespace_data[AS_NAMESPACE_SZ];
252} as_exchange_node_data;
253
254/*
255 * ----------------------------------------------------------------------------
256 * Exchange internal data structures
257 * ----------------------------------------------------------------------------
258 */
259
260/**
261 * Exchange subsystem status.
262 */
263typedef enum
264{
265 AS_EXCHANGE_SYS_STATE_UNINITIALIZED,
266 AS_EXCHANGE_SYS_STATE_RUNNING,
267 AS_EXCHANGE_SYS_STATE_SHUTTING_DOWN,
268 AS_EXCHANGE_SYS_STATE_STOPPED
269} as_exchange_sys_state;
270
271/**
272 * Exchange message types.
273 */
274typedef enum
275{
276 /**
277 * Exchange data for one node.
278 */
279 AS_EXCHANGE_MSG_TYPE_DATA,
280
281 /**
282 * Ack on receipt of exchanged data.
283 */
284 AS_EXCHANGE_MSG_TYPE_DATA_ACK,
285
286 /**
287 * Not used.
288 */
289 AS_EXCHANGE_MSG_TYPE_DATA_NACK,
290
291 /**
292 * The source is ready to commit exchanged information.
293 */
294 AS_EXCHANGE_MSG_TYPE_READY_TO_COMMIT,
295
296 /**
297 * Message from the principal asking all nodes to commit the exchanged
298 * information.
299 */
300 AS_EXCHANGE_MSG_TYPE_COMMIT,
301
302 /**
303 * Sentinel value for exchange message types.
304 */
305 AS_EXCHANGE_MSG_TYPE_SENTINEL
306} as_exchange_msg_type;
307
308/**
309 * Internal exchange event type.
310 */
311typedef enum
312{
313 /**
314 * Cluster change event.
315 */
316 AS_EXCHANGE_EVENT_CLUSTER_CHANGE,
317
318 /**
319 * Timer event.
320 */
321 AS_EXCHANGE_EVENT_TIMER,
322
323 /**
324 * Incoming message event.
325 */
326 AS_EXCHANGE_EVENT_MSG,
327} as_exchange_event_type;
328
329/**
330 * Internal exchange event.
331 */
332typedef struct as_exchange_event_s
333{
334 /**
335 * The type of the event.
336 */
337 as_exchange_event_type type;
338
339 /**
340 * Message for incoming message events.
341 */
342 msg* msg;
343
344 /**
345 * Source for incoming message events.
346 */
347 cf_node msg_source;
348
349 /**
350 * Clustering event instance for clustering events.
351 */
352 as_clustering_event* clustering_event;
353} as_exchange_event;
354
355/**
356 * Exchange subsystem state in the state transition diagram.
357 */
358typedef enum as_exchange_state_s
359{
360 /**
361 * Exchange subsystem is at rest will all data exchanged synchronized and
362 * committed.
363 */
364 AS_EXCHANGE_STATE_REST,
365
366 /**
367 * Data exchange is in progress.
368 */
369 AS_EXCHANGE_STATE_EXCHANGING,
370
371 /**
372 * Data exchange is complete and this node is ready to commit data.
373 */
374 AS_EXCHANGE_STATE_READY_TO_COMMIT,
375
376 /**
377 * Self node is orphaned.
378 */
379 AS_EXCHANGE_STATE_ORPHANED
380} as_exchange_state;
381
382/**
383 * State for a single node in the succession list.
384 */
385typedef struct as_exchange_node_state_s
386{
387 /**
388 * Inidicates if peer node has acknowledged send from self.
389 */
390 bool send_acked;
391
392 /**
393 * Inidicates if self node has received data from this peer.
394 */
395 bool received;
396
397 /**
398 * Inidicates if this peer node is ready to commit. Only relevant and used
399 * by the current principal.
400 */
401 bool is_ready_to_commit;
402
403 /**
404 * Exchange data received from this peer node. Member variables may be heap
405 * allocated and hence should be freed carefully while discarding this
406 * structure instance.
407 */
408 as_exchange_node_data* data;
409} as_exchange_node_state;
410
411/**
412 * State maintained by the exchange subsystem.
413 */
414typedef struct as_exchange_s
415{
416 /**
417 * Exchange subsystem status.
418 */
419 as_exchange_sys_state sys_state;
420
421 /**
422 * Exchange state in the state transition diagram.
423 */
424 as_exchange_state state;
425
426 /**
427 * Time when this node's exchange data was sent out.
428 */
429 cf_clock send_ts;
430
431 /**
432 * Time when this node's ready to commit was sent out.
433 */
434 cf_clock ready_to_commit_send_ts;
435
436 /**
437 * Thread id of the timer event generator.
438 */
439 pthread_t timer_tid;
440
441 /**
442 * Nodes that are not yet ready to commit.
443 */
444 cf_vector ready_to_commit_pending_nodes;
445
446 /**
447 * Current cluster key.
448 */
449 as_cluster_key cluster_key;
450
451 /**
452 * Exchange's copy of the succession list.
453 */
454 cf_vector succession_list;
455
456 /**
457 * The principal node in current succession list. Always the first node.
458 */
459 cf_node principal;
460
461 /**
462 * Used by exchange listeners during upgrades for compatibility purposes.
463 */
464 uint32_t compatibility_ids[AS_CLUSTER_SZ];
465
466 /**
467 * Used by exchange listeners during upgrades for compatibility purposes.
468 */
469 uint32_t min_compatibility_id;
470
471 /**
472 * Committed cluster generation.
473 */
474 uint64_t committed_cluster_generation;
475
476 /**
477 * Last committed cluster key.
478 */
479 as_cluster_key committed_cluster_key;
480
481 /**
482 * Last committed cluster size - size of the succession list.
483 */
484 uint32_t committed_cluster_size;
485
486 /**
487 * Last committed exchange's succession list.
488 */
489 cf_vector committed_succession_list;
490
491 /**
492 * The principal node in the committed succession list. Always the first
493 * node.
494 */
495 cf_node committed_principal;
496
497 /**
498 * The time this node entered orphan state.
499 */
500 cf_clock orphan_state_start_time;
501
502 /**
503 * Indicates if transactions have already been blocked in the orphan state.
504 */
505 bool orphan_state_are_transactions_blocked;
506
507 /**
508 * Will have an as_exchange_node_state entry for every node in the
509 * succession list.
510 */
511 cf_shash* nodeid_to_node_state;
512
513 /**
514 * Self node's partition version payload for current round.
515 */
516 cf_dyn_buf self_data_dyn_buf[AS_NAMESPACE_SZ];
517
518 /**
519 * This node's exchange data fabric message to send for current round.
520 */
521 msg* data_msg;
522} as_exchange;
523
524/**
525 * Internal storage for external event listeners.
526 */
527typedef struct as_exchange_event_listener_s
528{
529 /**
530 * The listener's calback function.
531 */
532 as_exchange_cluster_changed_cb event_callback;
533
534 /**
535 * The listeners user data object passed back as is to the callback
536 * function.
537 */
538 void* udata;
539} as_exchange_event_listener;
540
541/**
542 * External event publisher state.
543 */
544typedef struct as_exchange_external_event_publisher_s
545{
546 /**
547 * State of the external event publisher.
548 */
549 as_exchange_sys_state sys_state;
550
551 /**
552 * Inidicates if there is an event to publish.
553 */
554 bool event_queued;
555
556 /**
557 * The pending event to publish.
558 */
559 as_exchange_cluster_changed_event to_publish;
560
561 /**
562 * The static succession list published with the message.
563 */
564 cf_vector published_succession_list;
565
566 /**
567 * Conditional variable to signal a pending event.
568 */
569 pthread_cond_t is_pending;
570
571 /**
572 * Thread id of the publisher thread.
573 */
574 pthread_t event_publisher_tid;
575
576 /**
577 * Mutex to protect the conditional variable.
578 */
579 pthread_mutex_t is_pending_mutex;
580
581 /**
582 * External event listeners.
583 */
584 as_exchange_event_listener event_listeners[AS_EXTERNAL_EVENT_LISTENER_MAX];
585
586 /**
587 * Event listener count.
588 */
589 uint32_t event_listener_count;
590} as_exchange_external_event_publisher;
591
592/*
593 * ----------------------------------------------------------------------------
594 * Externs
595 * ----------------------------------------------------------------------------
596 */
597void
598as_skew_monitor_update();
599
600/*
601 * ----------------------------------------------------------------------------
602 * Globals
603 * ----------------------------------------------------------------------------
604 */
605
606/**
607 * Singleton exchange state all initialized to zero.
608 */
609static as_exchange g_exchange = { 0 };
610
611/**
612 * Rebalance flags.
613 */
614typedef enum
615{
616 AS_EXCHANGE_REBALANCE_FLAG_UNIFORM = 0x01,
617 AS_EXCHANGE_REBALANCE_FLAG_QUIESCE = 0x02
618} as_exchange_rebalance_flags;
619
620/**
621 * The fields in the exchange message. Should never change the order or elements
622 * in between.
623 */
624typedef enum
625{
626 AS_EXCHANGE_MSG_ID,
627 AS_EXCHANGE_MSG_TYPE,
628 AS_EXCHANGE_MSG_CLUSTER_KEY,
629 AS_EXCHANGE_MSG_NAMESPACES,
630 AS_EXCHANGE_MSG_NS_PARTITION_VERSIONS,
631 AS_EXCHANGE_MSG_NS_RACK_IDS,
632 AS_EXCHANGE_MSG_NS_ROSTER_GENERATIONS,
633 AS_EXCHANGE_MSG_NS_ROSTERS,
634 AS_EXCHANGE_MSG_NS_ROSTERS_RACK_IDS,
635 AS_EXCHANGE_MSG_NS_EVENTUAL_REGIMES,
636 AS_EXCHANGE_MSG_NS_REBALANCE_REGIMES,
637 AS_EXCHANGE_MSG_NS_REBALANCE_FLAGS,
638 AS_EXCHANGE_MSG_COMPATIBILITY_ID,
639
640 NUM_EXCHANGE_MSG_FIELDS
641} as_exchange_msg_fields;
642
643/**
644 * Exchange message template.
645 */
646static const msg_template exchange_msg_template[] = {
647 { AS_EXCHANGE_MSG_ID, M_FT_UINT32 },
648 { AS_EXCHANGE_MSG_TYPE, M_FT_UINT32 },
649 { AS_EXCHANGE_MSG_CLUSTER_KEY, M_FT_UINT64 },
650 { AS_EXCHANGE_MSG_NAMESPACES, M_FT_MSGPACK },
651 { AS_EXCHANGE_MSG_NS_PARTITION_VERSIONS, M_FT_MSGPACK },
652 { AS_EXCHANGE_MSG_NS_RACK_IDS, M_FT_MSGPACK },
653 { AS_EXCHANGE_MSG_NS_ROSTER_GENERATIONS, M_FT_MSGPACK },
654 { AS_EXCHANGE_MSG_NS_ROSTERS, M_FT_MSGPACK },
655 { AS_EXCHANGE_MSG_NS_ROSTERS_RACK_IDS, M_FT_MSGPACK },
656 { AS_EXCHANGE_MSG_NS_EVENTUAL_REGIMES, M_FT_MSGPACK },
657 { AS_EXCHANGE_MSG_NS_REBALANCE_REGIMES, M_FT_MSGPACK },
658 { AS_EXCHANGE_MSG_NS_REBALANCE_FLAGS, M_FT_MSGPACK },
659 { AS_EXCHANGE_MSG_COMPATIBILITY_ID, M_FT_UINT32 }
660};
661
662COMPILER_ASSERT(sizeof(exchange_msg_template) / sizeof(msg_template) ==
663 NUM_EXCHANGE_MSG_FIELDS);
664
665/**
666 * Global lock to set or get exchanged info from other threads.
667 */
668pthread_mutex_t g_exchanged_info_lock = PTHREAD_MUTEX_INITIALIZER;
669
670/**
671 * Global lock to serialize all reads and writes to the exchange state.
672 */
673pthread_mutex_t g_exchange_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
674
675/**
676 * Acquire a lock on the exchange subsystem.
677 */
678#define EXCHANGE_LOCK() \
679({ \
680 pthread_mutex_lock (&g_exchange_lock); \
681 LOCK_DEBUG("locked in %s", __FUNCTION__); \
682})
683
684/**
685 * Relinquish the lock on the exchange subsystem.
686 */
687#define EXCHANGE_UNLOCK() \
688({ \
689 pthread_mutex_unlock (&g_exchange_lock); \
690 LOCK_DEBUG("unLocked in %s", __FUNCTION__); \
691})
692
693/**
694 * Global lock to set or get committed exchange cluster. This is a lower
695 * granularity lock to allow read access to committed cluster even while
696 * exchange is busy for example rebalancing under the exchange lock.
697 */
698pthread_rwlock_t g_exchange_commited_cluster_lock = PTHREAD_RWLOCK_INITIALIZER;
699
700/**
701 * Acquire a read lock on the committed exchange cluster.
702 */
703#define EXCHANGE_COMMITTED_CLUSTER_RLOCK() \
704({ \
705 pthread_rwlock_rdlock (&g_exchange_commited_cluster_lock); \
706 LOCK_DEBUG("committed data locked in %s", __FUNCTION__); \
707})
708
709/**
710 * Acquire a write lock on the committed exchange cluster.
711 */
712#define EXCHANGE_COMMITTED_CLUSTER_WLOCK() \
713({ \
714 pthread_rwlock_wrlock (&g_exchange_commited_cluster_lock); \
715 LOCK_DEBUG("committed data locked in %s", __FUNCTION__); \
716})
717
718/**
719 * Relinquish the lock on the committed exchange cluster.
720 */
721#define EXCHANGE_COMMITTED_CLUSTER_UNLOCK() \
722({ \
723 pthread_rwlock_unlock (&g_exchange_commited_cluster_lock); \
724 LOCK_DEBUG("committed data unLocked in %s", __FUNCTION__); \
725})
726
727/**
728 * Singleton external events publisher.
729 */
730static as_exchange_external_event_publisher g_external_event_publisher;
731
732/**
733 * The fat lock for all clustering events listener changes.
734 */
735static pthread_mutex_t g_external_event_publisher_lock =
736 PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
737
738/**
739 * Acquire a lock on the event publisher.
740 */
741#define EXTERNAL_EVENT_PUBLISHER_LOCK() \
742({ \
743 pthread_mutex_lock (&g_external_event_publisher_lock); \
744 LOCK_DEBUG("publisher locked in %s", __FUNCTION__); \
745})
746
747/**
748 * Relinquish the lock on the external event publisher.
749 */
750#define EXTERNAL_EVENT_PUBLISHER_UNLOCK() \
751({ \
752 pthread_mutex_unlock (&g_external_event_publisher_lock); \
753 LOCK_DEBUG("publisher unLocked in %s", __FUNCTION__); \
754})
755
756/*
757 * ----------------------------------------------------------------------------
758 * Logging macros.
759 * ----------------------------------------------------------------------------
760 */
761
762/**
763 * Used to limit potentially long log lines. Includes space for NULL terminator.
764 */
765#define CRASH(format, ...) cf_crash(AS_EXCHANGE, format, ##__VA_ARGS__)
766#define WARNING(format, ...) cf_warning(AS_EXCHANGE, format, ##__VA_ARGS__)
767#define INFO(format, ...) cf_info(AS_EXCHANGE, format, ##__VA_ARGS__)
768#define DEBUG(format, ...) cf_debug(AS_EXCHANGE, format, ##__VA_ARGS__)
769#define DETAIL(format, ...) cf_detail(AS_EXCHANGE, format, ##__VA_ARGS__)
770#define LOG(severity, format, ...) \
771({ \
772 switch (severity) { \
773 case CF_CRITICAL: \
774 CRASH(format, ##__VA_ARGS__); \
775 break; \
776 case CF_WARNING: \
777 WARNING(format, ##__VA_ARGS__); \
778 break; \
779 case CF_INFO: \
780 INFO(format, ##__VA_ARGS__); \
781 break; \
782 case CF_DEBUG: \
783 DEBUG(format, ##__VA_ARGS__); \
784 break; \
785 case CF_DETAIL: \
786 DETAIL(format, ##__VA_ARGS__); \
787 break; \
788 default: \
789 break; \
790 } \
791})
792
793/**
794 * Size of the (per-namespace) self payload dynamic buffer.
795 */
796#define AS_EXCHANGE_SELF_DYN_BUF_SIZE() (AS_EXCHANGE_UNIQUE_VINFO_MAX_SIZE_SOFT \
797 * ((AS_EXCHANGE_VINFO_NUM_PIDS_AVG * sizeof(uint16_t)) \
798 + sizeof(as_partition_version)))
799
800/**
801 * Scratch size for exchange messages.
802 * TODO: Compute this properly.
803 */
804#define AS_EXCHANGE_MSG_SCRATCH_SIZE 2048
805
806#ifdef LOCK_DEBUG_ENABLED
807#define LOCK_DEBUG(format, ...) DEBUG(format, ##__VA_ARGS__)
808#else
809#define LOCK_DEBUG(format, ...)
810#endif
811
812/**
813 * Timer event generation interval.
814 */
815#define EXCHANGE_TIMER_TICK_INTERVAL() (75)
816
817/**
818 * Minimum timeout interval for sent exchange data.
819 */
820#define EXCHANGE_SEND_MIN_TIMEOUT() (MAX(75, as_hb_tx_interval_get() / 2))
821
822/**
823 * Maximum timeout interval for sent exchange data.
824 */
825#define EXCHANGE_SEND_MAX_TIMEOUT() (30000)
826
827/**
828 * Timeout for receiving commit message after transitioning to ready to commit.
829 */
830#define EXCHANGE_READY_TO_COMMIT_TIMEOUT() (EXCHANGE_SEND_MIN_TIMEOUT())
831
832/**
833 * Send timeout is a step function with this value as the interval for each
834 * step.
835 */
836#define EXCHANGE_SEND_STEP_INTERVAL() \
837(MAX(EXCHANGE_SEND_MIN_TIMEOUT(), as_hb_tx_interval_get()))
838
839/**
840 * Check if exchange is initialized.
841 */
842#define EXCHANGE_IS_INITIALIZED() \
843({ \
844 EXCHANGE_LOCK(); \
845 bool initialized = (g_exchange.sys_state \
846 != AS_EXCHANGE_SYS_STATE_UNINITIALIZED); \
847 EXCHANGE_UNLOCK(); \
848 initialized; \
849})
850
851/**
852 * * Check if exchange is running.
853 */
854#define EXCHANGE_IS_RUNNING() \
855({ \
856 EXCHANGE_LOCK(); \
857 bool running = (EXCHANGE_IS_INITIALIZED() \
858 && g_exchange.sys_state == AS_EXCHANGE_SYS_STATE_RUNNING); \
859 EXCHANGE_UNLOCK(); \
860 running; \
861})
862
863/**
864 * Create temporary stack variables.
865 */
866#define TOKEN_PASTE(x, y) x##y
867#define STACK_VAR(x, y) TOKEN_PASTE(x, y)
868
869/**
870 * Convert a vector to a stack allocated array.
871 */
872#define cf_vector_to_stack_array(vector_p, nodes_array_p, num_nodes_p) \
873({ \
874 *num_nodes_p = cf_vector_size(vector_p); \
875 if (*num_nodes_p > 0) { \
876 *nodes_array_p = alloca(sizeof(cf_node) * (*num_nodes_p)); \
877 for (int i = 0; i < *num_nodes_p; i++) { \
878 cf_vector_get(vector_p, i, &(*nodes_array_p)[i]); \
879 } \
880 } \
881 else { \
882 *nodes_array_p = NULL; \
883 } \
884})
885
886/**
887 * Create and initialize a lockless stack allocated vector to initially sized to
888 * store cluster node number of elements.
889 */
890#define cf_vector_stack_create(value_type) \
891({ \
892 cf_vector * STACK_VAR(vector, __LINE__) = (cf_vector*)alloca( \
893 sizeof(cf_vector)); \
894 size_t buffer_size = AS_EXCHANGE_CLUSTER_MAX_SIZE_SOFT \
895 * sizeof(value_type); \
896 void* STACK_VAR(buff, __LINE__) = alloca(buffer_size); cf_vector_init_smalloc( \
897 STACK_VAR(vector, __LINE__), sizeof(value_type), \
898 (uint8_t*)STACK_VAR(buff, __LINE__), buffer_size, \
899 VECTOR_FLAG_INITZERO); \
900 STACK_VAR(vector, __LINE__); \
901})
902
903/*
904 * ----------------------------------------------------------------------------
905 * Vector functions to be moved to cf_vector
906 * ----------------------------------------------------------------------------
907 */
908
909/**
910 * Convert a vector to an array.
911 * FIXME: return pointer to the internal vector storage.
912 */
913static cf_node*
914vector_to_array(cf_vector* vector)
915{
916 return (cf_node*)vector->vector;
917}
918
919/**
920 * Clear / delete all entries in a vector.
921 */
922static void
923vector_clear(cf_vector* vector)
924{
925 cf_vector_delete_range(vector, 0, cf_vector_size(vector));
926}
927
928/**
929 * Find the index of an element in the vector. Equality is based on mem compare.
930 *
931 * @param vector the source vector.
932 * @param element the element to find.
933 * @return the index if the element is found, -1 otherwise.
934 */
935static int
936vector_find(cf_vector* vector, const void* element)
937{
938 int element_count = cf_vector_size(vector);
939 size_t value_len = VECTOR_ELEM_SZ(vector);
940 for (int i = 0; i < element_count; i++) {
941 // No null check required since we are iterating under a lock and within
942 // vector bounds.
943 void* src_element = cf_vector_getp(vector, i);
944 if (src_element) {
945 if (memcmp(element, src_element, value_len) == 0) {
946 return i;
947 }
948 }
949 }
950 return -1;
951}
952
953/**
954 * Copy all elements form the source vector to the destination vector to the
955 * destination vector. Assumes the source and destination vector are not being
956 * modified while the copy operation is in progress.
957 *
958 * @param dest the destination vector.
959 * @param src the source vector.
960 * @return the number of elements copied.
961 */
962static int
963vector_copy(cf_vector* dest, cf_vector* src)
964{
965 int element_count = cf_vector_size(src);
966 int copied_count = 0;
967 for (int i = 0; i < element_count; i++) {
968 // No null check required since we are iterating under a lock and within
969 // vector bounds.
970 void* src_element = cf_vector_getp(src, i);
971 if (src_element) {
972 cf_vector_append(dest, src_element);
973 copied_count++;
974 }
975 }
976 return copied_count;
977}
978
979/**
980 * Generate a hash code for a blob using Jenkins hash function.
981 */
982static uint32_t
983exchange_blob_hash(const uint8_t* value, size_t value_size)
984{
985 uint32_t hash = 0;
986 for (int i = 0; i < value_size; ++i) {
987 hash += value[i];
988 hash += (hash << 10);
989 hash ^= (hash >> 6);
990 }
991 hash += (hash << 3);
992 hash ^= (hash >> 11);
993 hash += (hash << 15);
994
995 return hash;
996}
997
998/**
999 * Generate a hash code for a mesh node key.
1000 */
1001static uint32_t
1002exchange_vinfo_shash(const void* value)
1003{
1004 return exchange_blob_hash((const uint8_t*)value,
1005 sizeof(as_partition_version));
1006}
1007
1008/*
1009 * ----------------------------------------------------------------------------
1010 * Clustering external event publisher
1011 * ----------------------------------------------------------------------------
1012 */
1013
1014/**
1015 * * Check if event publisher is running.
1016 */
1017static bool
1018exchange_external_event_publisher_is_running()
1019{
1020 EXTERNAL_EVENT_PUBLISHER_LOCK();
1021 bool running = g_external_event_publisher.sys_state
1022 == AS_EXCHANGE_SYS_STATE_RUNNING;
1023 EXTERNAL_EVENT_PUBLISHER_UNLOCK();
1024 return running;
1025}
1026
1027/**
1028 * Initialize the event publisher.
1029 */
1030static void
1031exchange_external_event_publisher_init()
1032{
1033 EXTERNAL_EVENT_PUBLISHER_LOCK();
1034 memset(&g_external_event_publisher, 0, sizeof(g_external_event_publisher));
1035 cf_vector_init(&g_external_event_publisher.published_succession_list,
1036 sizeof(cf_node),
1037 AS_EXCHANGE_CLUSTER_MAX_SIZE_SOFT, VECTOR_FLAG_INITZERO);
1038
1039 pthread_mutex_init(&g_external_event_publisher.is_pending_mutex, NULL);
1040 pthread_cond_init(&g_external_event_publisher.is_pending, NULL);
1041 EXTERNAL_EVENT_PUBLISHER_UNLOCK();
1042}
1043
1044/**
1045 * Register a clustering event listener.
1046 */
1047static void
1048exchange_external_event_listener_register(
1049 as_exchange_cluster_changed_cb event_callback, void* udata)
1050{
1051 EXTERNAL_EVENT_PUBLISHER_LOCK();
1052
1053 if (g_external_event_publisher.event_listener_count
1054 >= AS_EXTERNAL_EVENT_LISTENER_MAX) {
1055 CRASH("cannot register more than %d event listeners",
1056 AS_EXTERNAL_EVENT_LISTENER_MAX);
1057 }
1058
1059 g_external_event_publisher.event_listeners[g_external_event_publisher.event_listener_count].event_callback =
1060 event_callback;
1061 g_external_event_publisher.event_listeners[g_external_event_publisher.event_listener_count].udata =
1062 udata;
1063 g_external_event_publisher.event_listener_count++;
1064
1065 EXTERNAL_EVENT_PUBLISHER_UNLOCK();
1066}
1067
1068/**
1069 * Wakeup the publisher thread.
1070 */
1071static void
1072exchange_external_event_publisher_thr_wakeup()
1073{
1074 pthread_mutex_lock(&g_external_event_publisher.is_pending_mutex);
1075 pthread_cond_signal(&g_external_event_publisher.is_pending);
1076 pthread_mutex_unlock(&g_external_event_publisher.is_pending_mutex);
1077}
1078
1079/**
1080 * Queue up and external event to publish.
1081 */
1082static void
1083exchange_external_event_queue(as_exchange_cluster_changed_event* event)
1084{
1085 EXTERNAL_EVENT_PUBLISHER_LOCK();
1086 memcpy(&g_external_event_publisher.to_publish, event,
1087 sizeof(g_external_event_publisher.to_publish));
1088
1089 vector_clear(&g_external_event_publisher.published_succession_list);
1090 if (event->succession) {
1091 // Use the static list for the published event, so that the input event
1092 // object can be destroyed irrespective of when the it is published.
1093 for (int i = 0; i < event->cluster_size; i++) {
1094 cf_vector_append(
1095 &g_external_event_publisher.published_succession_list,
1096 &event->succession[i]);
1097 }
1098 g_external_event_publisher.to_publish.succession = vector_to_array(
1099 &g_external_event_publisher.published_succession_list);
1100
1101 }
1102 else {
1103 g_external_event_publisher.to_publish.succession = NULL;
1104 }
1105
1106 g_external_event_publisher.event_queued = true;
1107
1108 EXTERNAL_EVENT_PUBLISHER_UNLOCK();
1109
1110 // Wake up the publisher thread.
1111 exchange_external_event_publisher_thr_wakeup();
1112}
1113
1114/**
1115 * Publish external events if any are pending.
1116 */
1117static void
1118exchange_external_events_publish()
1119{
1120 EXTERNAL_EVENT_PUBLISHER_LOCK();
1121
1122 if (g_external_event_publisher.event_queued) {
1123 g_external_event_publisher.event_queued = false;
1124 for (uint32_t i = 0;
1125 i < g_external_event_publisher.event_listener_count; i++) {
1126 (g_external_event_publisher.event_listeners[i].event_callback)(
1127 &g_external_event_publisher.to_publish,
1128 g_external_event_publisher.event_listeners[i].udata);
1129 }
1130 }
1131 EXTERNAL_EVENT_PUBLISHER_UNLOCK();
1132}
1133
1134/**
1135 * External event publisher thread.
1136 */
1137static void*
1138exchange_external_event_publisher_thr(void* arg)
1139{
1140 pthread_mutex_lock(&g_external_event_publisher.is_pending_mutex);
1141
1142 while (true) {
1143 pthread_cond_wait(&g_external_event_publisher.is_pending,
1144 &g_external_event_publisher.is_pending_mutex);
1145 if (exchange_external_event_publisher_is_running()) {
1146 exchange_external_events_publish();
1147 }
1148 else {
1149 // Publisher stopped, exit the tread.
1150 break;
1151 }
1152 }
1153
1154 return NULL;
1155}
1156
1157/**
1158 * Start the event publisher.
1159 */
1160static void
1161exchange_external_event_publisher_start()
1162{
1163 EXTERNAL_EVENT_PUBLISHER_LOCK();
1164 g_external_event_publisher.sys_state = AS_EXCHANGE_SYS_STATE_RUNNING;
1165 g_external_event_publisher.event_publisher_tid =
1166 cf_thread_create_joinable(exchange_external_event_publisher_thr,
1167 NULL);
1168 EXTERNAL_EVENT_PUBLISHER_UNLOCK();
1169}
1170
1171/**
1172 * Stop the event publisher.
1173 */
1174static void
1175external_event_publisher_stop()
1176{
1177 EXTERNAL_EVENT_PUBLISHER_LOCK();
1178 g_external_event_publisher.sys_state = AS_EXCHANGE_SYS_STATE_SHUTTING_DOWN;
1179 EXTERNAL_EVENT_PUBLISHER_UNLOCK();
1180
1181 exchange_external_event_publisher_thr_wakeup();
1182 cf_thread_join(g_external_event_publisher.event_publisher_tid);
1183
1184 EXTERNAL_EVENT_PUBLISHER_LOCK();
1185 g_external_event_publisher.sys_state = AS_EXCHANGE_SYS_STATE_STOPPED;
1186 g_external_event_publisher.event_queued = false;
1187 EXTERNAL_EVENT_PUBLISHER_UNLOCK();
1188}
1189
1190/*
1191 * ----------------------------------------------------------------------------
1192 * Node state related
1193 * ----------------------------------------------------------------------------
1194 */
1195
1196/**
1197 * Initialize node state.
1198 */
1199static void
1200exchange_node_state_init(as_exchange_node_state* node_state)
1201{
1202 memset(node_state, 0, sizeof(*node_state));
1203
1204 node_state->data = cf_calloc(1, sizeof(as_exchange_node_data));
1205}
1206
1207/**
1208 * Reset node state.
1209 */
1210static void
1211exchange_node_state_reset(as_exchange_node_state* node_state)
1212{
1213 node_state->send_acked = false;
1214 node_state->received = false;
1215 node_state->is_ready_to_commit = false;
1216
1217 node_state->data->num_namespaces = 0;
1218 for (int i = 0; i < AS_NAMESPACE_SZ; i++) {
1219 node_state->data->namespace_data[i].local_namespace = NULL;
1220 }
1221}
1222
1223/**
1224 * Destroy node state.
1225 */
1226static void
1227exchange_node_state_destroy(as_exchange_node_state* node_state)
1228{
1229 for (int i = 0; i < AS_NAMESPACE_SZ; i++) {
1230 if (node_state->data->namespace_data[i].partition_versions) {
1231 cf_free(node_state->data->namespace_data[i].partition_versions);
1232 }
1233
1234 if (node_state->data->namespace_data[i].roster) {
1235 cf_free(node_state->data->namespace_data[i].roster);
1236 }
1237
1238 if (node_state->data->namespace_data[i].roster_rack_ids) {
1239 cf_free(node_state->data->namespace_data[i].roster_rack_ids);
1240 }
1241 }
1242
1243 cf_free(node_state->data);
1244}
1245
1246/**
1247 * Reduce function to match node -> node state hash to the succession list.
1248 * Should always be invoked under a lock over the main hash.
1249 */
1250static int
1251exchange_node_states_reset_reduce(const void* key, void* data, void* udata)
1252{
1253 const cf_node* node = (const cf_node*)key;
1254 as_exchange_node_state* node_state = (as_exchange_node_state*)data;
1255
1256 int node_index = vector_find(&g_exchange.succession_list, node);
1257 if (node_index < 0) {
1258 // Node not in succession list
1259 exchange_node_state_destroy(node_state);
1260 return CF_SHASH_REDUCE_DELETE;
1261 }
1262
1263 exchange_node_state_reset(node_state);
1264 return CF_SHASH_OK;
1265}
1266
1267/**
1268 * Adjust the nodeid_to_node_state hash to have an entry for every node in the
1269 * succession list with state reset for a new round of exchange. Removes entries
1270 * not in the succession list.
1271 */
1272static void
1273exchange_node_states_reset()
1274{
1275 EXCHANGE_LOCK();
1276
1277 // Fix existing entries by reseting entries in succession and removing
1278 // entries not in succession list.
1279 cf_shash_reduce(g_exchange.nodeid_to_node_state,
1280 exchange_node_states_reset_reduce, NULL);
1281
1282 // Add missing entries.
1283 int succession_length = cf_vector_size(&g_exchange.succession_list);
1284
1285 as_exchange_node_state temp_state;
1286 for (int i = 0; i < succession_length; i++) {
1287 cf_node nodeid;
1288
1289 cf_vector_get(&g_exchange.succession_list, i, &nodeid);
1290 if (cf_shash_get(g_exchange.nodeid_to_node_state, &nodeid, &temp_state)
1291 == CF_SHASH_ERR_NOT_FOUND) {
1292 exchange_node_state_init(&temp_state);
1293
1294 cf_shash_put(g_exchange.nodeid_to_node_state, &nodeid, &temp_state);
1295 }
1296 }
1297
1298 EXCHANGE_UNLOCK();
1299}
1300
1301/**
1302 * Reduce function to find nodes that had not acked self node's exchange data.
1303 */
1304static int
1305exchange_nodes_find_send_unacked_reduce(const void* key, void* data,
1306 void* udata)
1307{
1308 const cf_node* node = (const cf_node*)key;
1309 as_exchange_node_state* node_state = (as_exchange_node_state*)data;
1310 cf_vector* unacked = (cf_vector*)udata;
1311
1312 if (!node_state->send_acked) {
1313 cf_vector_append(unacked, node);
1314 }
1315 return CF_SHASH_OK;
1316}
1317
1318/**
1319 * Find nodes that have not acked self node's exchange data.
1320 */
1321static void
1322exchange_nodes_find_send_unacked(cf_vector* unacked)
1323{
1324 cf_shash_reduce(g_exchange.nodeid_to_node_state,
1325 exchange_nodes_find_send_unacked_reduce, unacked);
1326}
1327
1328/**
1329 * Reduce function to find peer nodes from whom self node has not received
1330 * exchange data.
1331 */
1332static int
1333exchange_nodes_find_not_received_reduce(const void* key, void* data,
1334 void* udata)
1335{
1336 const cf_node* node = (const cf_node*)key;
1337 as_exchange_node_state* node_state = (as_exchange_node_state*)data;
1338 cf_vector* not_received = (cf_vector*)udata;
1339
1340 if (!node_state->received) {
1341 cf_vector_append(not_received, node);
1342 }
1343 return CF_SHASH_OK;
1344}
1345
1346/**
1347 * Find peer nodes from whom self node has not received exchange data.
1348 */
1349static void
1350exchange_nodes_find_not_received(cf_vector* not_received)
1351{
1352 cf_shash_reduce(g_exchange.nodeid_to_node_state,
1353 exchange_nodes_find_not_received_reduce, not_received);
1354}
1355
1356/**
1357 * Reduce function to find peer nodes that are not ready to commit.
1358 */
1359static int
1360exchange_nodes_find_not_ready_to_commit_reduce(const void* key, void* data,
1361 void* udata)
1362{
1363 const cf_node* node = (const cf_node*)key;
1364 as_exchange_node_state* node_state = (as_exchange_node_state*)data;
1365 cf_vector* not_ready_to_commit = (cf_vector*)udata;
1366
1367 if (!node_state->is_ready_to_commit) {
1368 cf_vector_append(not_ready_to_commit, node);
1369 }
1370 return CF_SHASH_OK;
1371}
1372
1373/**
1374 * Find peer nodes that are not ready to commit.
1375 */
1376static void
1377exchange_nodes_find_not_ready_to_commit(cf_vector* not_ready_to_commit)
1378{
1379 cf_shash_reduce(g_exchange.nodeid_to_node_state,
1380 exchange_nodes_find_not_ready_to_commit_reduce,
1381 not_ready_to_commit);
1382}
1383
1384/**
1385 * Update the node state for a node.
1386 */
1387static void
1388exchange_node_state_update(cf_node nodeid, as_exchange_node_state* node_state)
1389{
1390 cf_shash_put(g_exchange.nodeid_to_node_state, &nodeid, node_state);
1391}
1392
1393/**
1394 * Get state of a node from the hash. If not found crash because this entry
1395 * should be present in the hash.
1396 */
1397static void
1398exchange_node_state_get_safe(cf_node nodeid, as_exchange_node_state* node_state)
1399{
1400 if (cf_shash_get(g_exchange.nodeid_to_node_state, &nodeid, node_state)
1401 == CF_SHASH_ERR_NOT_FOUND) {
1402 CRASH(
1403 "node entry for node %"PRIx64" missing from node state hash", nodeid);
1404 }
1405}
1406
1407/*
1408 * ----------------------------------------------------------------------------
1409 * Message related
1410 * ----------------------------------------------------------------------------
1411 */
1412
1413/**
1414 * Fill compulsary fields in a message common to all message types.
1415 */
1416static void
1417exchange_msg_src_fill(msg* msg, as_exchange_msg_type type)
1418{
1419 EXCHANGE_LOCK();
1420 msg_set_uint32(msg, AS_EXCHANGE_MSG_ID, AS_EXCHANGE_PROTOCOL_IDENTIFIER);
1421 msg_set_uint64(msg, AS_EXCHANGE_MSG_CLUSTER_KEY, g_exchange.cluster_key);
1422 msg_set_uint32(msg, AS_EXCHANGE_MSG_TYPE, type);
1423 EXCHANGE_UNLOCK();
1424}
1425
1426/**
1427 * Get the msg buffer from a pool and fill in all compulsory fields.
1428 * @return the msg buff with compulsory fields filled in.
1429 */
1430static msg*
1431exchange_msg_get(as_exchange_msg_type type)
1432{
1433 msg* msg = as_fabric_msg_get(M_TYPE_EXCHANGE);
1434 exchange_msg_src_fill(msg, type);
1435 return msg;
1436}
1437
1438/**
1439 * Return the message buffer back to the pool.
1440 */
1441static void
1442exchange_msg_return(msg* msg)
1443{
1444 as_fabric_msg_put(msg);
1445}
1446
1447/**
1448 * Get message id.
1449 */
1450static int
1451exchange_msg_id_get(msg* msg, uint32_t* msg_id)
1452{
1453 if (msg_get_uint32(msg, AS_EXCHANGE_MSG_ID, msg_id) != 0) {
1454 return -1;
1455 }
1456 return 0;
1457}
1458
1459/**
1460 * Get message type.
1461 */
1462static int
1463exchange_msg_type_get(msg* msg, as_exchange_msg_type* msg_type)
1464{
1465 if (msg_get_uint32(msg, AS_EXCHANGE_MSG_TYPE, msg_type) != 0) {
1466 return -1;
1467 }
1468 return 0;
1469}
1470
1471/**
1472 * Get message cluster key.
1473 */
1474static int
1475exchange_msg_cluster_key_get(msg* msg, as_cluster_key* cluster_key)
1476{
1477 if (msg_get_uint64(msg, AS_EXCHANGE_MSG_CLUSTER_KEY, cluster_key) != 0) {
1478 return -1;
1479 }
1480 return 0;
1481}
1482
1483/**
1484 * Set data payload for a message.
1485 */
1486static void
1487exchange_msg_data_payload_set(msg* msg)
1488{
1489 uint32_t ns_count = g_config.n_namespaces;
1490
1491 cf_vector_define(namespace_list, sizeof(msg_buf_ele), ns_count, 0);
1492 cf_vector_define(partition_versions, sizeof(msg_buf_ele), ns_count, 0);
1493 uint32_t rack_ids[ns_count];
1494
1495 bool have_roster = false;
1496 bool have_roster_rack_ids = false;
1497 uint32_t roster_generations[ns_count];
1498 cf_vector_define(rosters, sizeof(msg_buf_ele), ns_count, 0);
1499 cf_vector_define(rosters_rack_ids, sizeof(msg_buf_ele), ns_count, 0);
1500
1501 bool have_regimes = false;
1502 uint32_t eventual_regimes[ns_count];
1503 uint32_t rebalance_regimes[ns_count];
1504
1505 bool have_rebalance_flags = false;
1506 uint32_t rebalance_flags[ns_count];
1507
1508 memset(rebalance_flags, 0, sizeof(rebalance_flags));
1509
1510 msg_set_uint32(msg, AS_EXCHANGE_MSG_COMPATIBILITY_ID,
1511 AS_EXCHANGE_COMPATIBILITY_ID);
1512
1513 for (uint32_t ns_ix = 0; ns_ix < ns_count; ns_ix++) {
1514 as_namespace* ns = g_config.namespaces[ns_ix];
1515
1516 msg_buf_ele ns_ele = {
1517 .sz = (uint32_t)strlen(ns->name),
1518 .ptr = (uint8_t*)ns->name
1519 };
1520
1521 msg_buf_ele pv_ele = {
1522 .sz = (uint32_t)g_exchange.self_data_dyn_buf[ns_ix].used_sz,
1523 .ptr = g_exchange.self_data_dyn_buf[ns_ix].buf
1524 };
1525
1526 msg_buf_ele rn_ele = {
1527 .sz = (uint32_t)(ns->smd_roster_count * sizeof(cf_node)),
1528 .ptr = (uint8_t*)ns->smd_roster
1529 };
1530
1531 msg_buf_ele rri_ele = {
1532 .sz = (uint32_t)(ns->smd_roster_count * sizeof(uint32_t)),
1533 .ptr = (uint8_t*)ns->smd_roster_rack_ids
1534 };
1535
1536 cf_vector_append(&namespace_list, &ns_ele);
1537 cf_vector_append(&partition_versions, &pv_ele);
1538 rack_ids[ns_ix] = ns->rack_id;
1539
1540 if (ns->smd_roster_generation != 0) {
1541 have_roster = true;
1542
1543 if (! have_roster_rack_ids) {
1544 for (uint32_t n = 0; n < ns->smd_roster_count; n++) {
1545 if (ns->smd_roster_rack_ids[n] != 0) {
1546 have_roster_rack_ids = true;
1547 break;
1548 }
1549 }
1550 }
1551 }
1552
1553 roster_generations[ns_ix] = ns->smd_roster_generation;
1554 cf_vector_append(&rosters, &rn_ele);
1555 cf_vector_append(&rosters_rack_ids, &rri_ele);
1556
1557 eventual_regimes[ns_ix] = ns->eventual_regime;
1558 rebalance_regimes[ns_ix] = ns->rebalance_regime;
1559
1560 if (eventual_regimes[ns_ix] != 0 || rebalance_regimes[ns_ix] != 0) {
1561 have_regimes = true;
1562 }
1563
1564 if (ns->cfg_prefer_uniform_balance) {
1565 rebalance_flags[ns_ix] |= AS_EXCHANGE_REBALANCE_FLAG_UNIFORM;
1566 }
1567
1568 if (ns->pending_quiesce) {
1569 rebalance_flags[ns_ix] |= AS_EXCHANGE_REBALANCE_FLAG_QUIESCE;
1570 }
1571
1572 if (rebalance_flags[ns_ix] != 0) {
1573 have_rebalance_flags = true;
1574 }
1575 }
1576
1577 msg_msgpack_list_set_buf(msg, AS_EXCHANGE_MSG_NAMESPACES, &namespace_list);
1578 msg_msgpack_list_set_buf(msg, AS_EXCHANGE_MSG_NS_PARTITION_VERSIONS,
1579 &partition_versions);
1580 msg_msgpack_list_set_uint32(msg, AS_EXCHANGE_MSG_NS_RACK_IDS, rack_ids,
1581 ns_count);
1582
1583 if (have_roster) {
1584 msg_msgpack_list_set_uint32(msg, AS_EXCHANGE_MSG_NS_ROSTER_GENERATIONS,
1585 roster_generations, ns_count);
1586 msg_msgpack_list_set_buf(msg, AS_EXCHANGE_MSG_NS_ROSTERS, &rosters);
1587
1588 if (have_roster_rack_ids) {
1589 msg_msgpack_list_set_buf(msg, AS_EXCHANGE_MSG_NS_ROSTERS_RACK_IDS,
1590 &rosters_rack_ids);
1591 }
1592 }
1593
1594 if (have_regimes) {
1595 msg_msgpack_list_set_uint32(msg, AS_EXCHANGE_MSG_NS_EVENTUAL_REGIMES,
1596 eventual_regimes, ns_count);
1597 msg_msgpack_list_set_uint32(msg, AS_EXCHANGE_MSG_NS_REBALANCE_REGIMES,
1598 rebalance_regimes, ns_count);
1599 }
1600
1601 if (have_rebalance_flags) {
1602 msg_msgpack_list_set_uint32(msg, AS_EXCHANGE_MSG_NS_REBALANCE_FLAGS,
1603 rebalance_flags, ns_count);
1604 }
1605}
1606
1607/**
1608 * Check sanity of an incoming message. If this check passes the message is
1609 * guaranteed to have valid protocol identifier, valid type and valid matching
1610 * cluster key with source node being a part of the cluster.
1611 * @return 0 if the message in valid, -1 if the message is invalid and should be
1612 * ignored.
1613 */
1614static bool
1615exchange_msg_is_sane(cf_node source, msg* msg)
1616{
1617 uint32_t id = 0;
1618 if (exchange_msg_id_get(msg, &id) != 0||
1619 id != AS_EXCHANGE_PROTOCOL_IDENTIFIER) {
1620 DEBUG(
1621 "received exchange message with mismatching identifier - expected %u but was %u",
1622 AS_EXCHANGE_PROTOCOL_IDENTIFIER, id);
1623 return false;
1624 }
1625
1626 as_exchange_msg_type msg_type = 0;
1627
1628 if (exchange_msg_type_get(msg, &msg_type) != 0
1629 || msg_type >= AS_EXCHANGE_MSG_TYPE_SENTINEL) {
1630 WARNING("received exchange message with invalid message type %u",
1631 msg_type);
1632 return false;
1633 }
1634
1635 EXCHANGE_LOCK();
1636 as_cluster_key current_cluster_key = g_exchange.cluster_key;
1637 bool is_in_cluster = vector_find(&g_exchange.succession_list, &source) >= 0;
1638 EXCHANGE_UNLOCK();
1639
1640 if (!is_in_cluster) {
1641 DEBUG("received exchange message from node %"PRIx64" not in cluster",
1642 source);
1643 return false;
1644 }
1645
1646 as_cluster_key incoming_cluster_key = 0;
1647 if (exchange_msg_cluster_key_get(msg, &incoming_cluster_key) != 0
1648 || (current_cluster_key != incoming_cluster_key)
1649 || current_cluster_key == 0) {
1650 DEBUG("received exchange message with mismatching cluster key - expected %"PRIx64" but was %"PRIx64,
1651 current_cluster_key, incoming_cluster_key);
1652 return false;
1653 }
1654
1655 return true;
1656}
1657
1658/**
1659 * Send a message over fabric.
1660 *
1661 * @param msg the message to send.
1662 * @param dest the desination node.
1663 * @param error_msg the error message.
1664 */
1665static void
1666exchange_msg_send(msg* msg, cf_node dest, char* error_msg)
1667{
1668 if (as_fabric_send(dest, msg, AS_FABRIC_CHANNEL_CTRL)) {
1669 // Fabric will not return the message to the pool. Do it ourself.
1670 exchange_msg_return(msg);
1671 WARNING("%s (dest:%"PRIx64")", error_msg, dest);
1672 }
1673}
1674
1675/**
1676 * Send a message over to a list of destination nodes.
1677 *
1678 * @param msg the message to send.
1679 * @param dests the node list to send the message to.
1680 * @param num_dests the number of destination nodes.
1681 * @param error_msg the error message.
1682 */
1683static void
1684exchange_msg_send_list(msg* msg, cf_node* dests, int num_dests, char* error_msg)
1685{
1686 if (as_fabric_send_list(dests, num_dests, msg, AS_FABRIC_CHANNEL_CTRL)
1687 != 0) {
1688 // Fabric will not return the message to the pool. Do it ourself.
1689 exchange_msg_return(msg);
1690 as_clustering_log_cf_node_array(CF_WARNING, AS_EXCHANGE, error_msg,
1691 dests, num_dests);
1692 }
1693}
1694
1695/**
1696 * Send a commit message to a destination node.
1697 * @param dest the destination node.
1698 */
1699static void
1700exchange_commit_msg_send(cf_node dest)
1701{
1702 msg* commit_msg = exchange_msg_get(AS_EXCHANGE_MSG_TYPE_COMMIT);
1703 DEBUG("sending commit message to node %"PRIx64, dest);
1704 exchange_msg_send(commit_msg, dest, "error sending commit message");
1705}
1706
1707/**
1708 * Send a commit message to a list of destination nodes.
1709 * @param dests the destination nodes.
1710 * @param num_dests the number of destination nodes.
1711 */
1712static void
1713exchange_commit_msg_send_all(cf_node* dests, int num_dests)
1714{
1715 msg* commit_msg = exchange_msg_get(AS_EXCHANGE_MSG_TYPE_COMMIT);
1716 as_clustering_log_cf_node_array(CF_DEBUG, AS_EXCHANGE,
1717 "sending commit message to nodes:", dests, num_dests);
1718 exchange_msg_send_list(commit_msg, dests, num_dests,
1719 "error sending commit message");
1720}
1721
1722/**
1723 * Send ready to commit message to the principal.
1724 */
1725static void
1726exchange_ready_to_commit_msg_send()
1727{
1728 EXCHANGE_LOCK();
1729 g_exchange.ready_to_commit_send_ts = cf_getms();
1730 cf_node principal = g_exchange.principal;
1731 EXCHANGE_UNLOCK();
1732
1733 msg* ready_to_commit_msg = exchange_msg_get(
1734 AS_EXCHANGE_MSG_TYPE_READY_TO_COMMIT);
1735 DEBUG("sending ready to commit message to node %"PRIx64, principal);
1736 exchange_msg_send(ready_to_commit_msg, principal,
1737 "error sending ready to commit message");
1738}
1739
1740/**
1741 * Send exchange data to all nodes that have not acked the send.
1742 */
1743static void
1744exchange_data_msg_send_pending_ack()
1745{
1746 EXCHANGE_LOCK();
1747 g_exchange.send_ts = cf_getms();
1748
1749 cf_node* unacked_nodes;
1750 int num_unacked_nodes;
1751 cf_vector* unacked_nodes_vector = cf_vector_stack_create(cf_node);
1752
1753 exchange_nodes_find_send_unacked(unacked_nodes_vector);
1754 cf_vector_to_stack_array(unacked_nodes_vector, &unacked_nodes,
1755 &num_unacked_nodes);
1756
1757 cf_vector_destroy(unacked_nodes_vector);
1758
1759 if (!num_unacked_nodes) {
1760 goto Exit;
1761 }
1762
1763 // FIXME - temporary assert, until we're sure.
1764 cf_assert(g_exchange.data_msg != NULL, AS_EXCHANGE, "payload not built");
1765
1766 as_clustering_log_cf_node_array(CF_DEBUG, AS_EXCHANGE,
1767 "sending exchange data to nodes:", unacked_nodes,
1768 num_unacked_nodes);
1769
1770 msg_incr_ref(g_exchange.data_msg);
1771
1772 exchange_msg_send_list(g_exchange.data_msg, unacked_nodes,
1773 num_unacked_nodes, "error sending exchange data");
1774Exit:
1775 EXCHANGE_UNLOCK();
1776}
1777
1778/**
1779 * Send a commit message to a destination node.
1780 * @param dest the destination node.
1781 */
1782static void
1783exchange_data_ack_msg_send(cf_node dest)
1784{
1785 msg* ack_msg = exchange_msg_get(AS_EXCHANGE_MSG_TYPE_DATA_ACK);
1786 DEBUG("sending data ack message to node %"PRIx64, dest);
1787 exchange_msg_send(ack_msg, dest, "error sending data ack message");
1788}
1789
1790/*
1791 * ----------------------------------------------------------------------------
1792 * Data payload related
1793 * ----------------------------------------------------------------------------
1794 */
1795
1796/**
1797 * Add a pid to the namespace hash for the input vinfo.
1798 */
1799static void
1800exchange_namespace_hash_pid_add(cf_shash* ns_hash, as_partition_version* vinfo,
1801 uint16_t pid)
1802{
1803 if (as_partition_version_is_null(vinfo)) {
1804 // Ignore NULL vinfos.
1805 return;
1806 }
1807
1808 cf_vector* pid_vector;
1809
1810 // Append the hash.
1811 if (cf_shash_get(ns_hash, vinfo, &pid_vector) != CF_SHASH_OK) {
1812 // We are seeing this vinfo for the first time.
1813 pid_vector = cf_vector_create(sizeof(uint16_t),
1814 AS_EXCHANGE_VINFO_NUM_PIDS_AVG, 0);
1815 cf_shash_put(ns_hash, vinfo, &pid_vector);
1816 }
1817
1818 cf_vector_append(pid_vector, &pid);
1819}
1820
1821/**
1822 * Destroy the pid vector for each vinfo.
1823 */
1824static int
1825exchange_namespace_hash_destroy_reduce(const void* key, void* data, void* udata)
1826{
1827 cf_vector* pid_vector = *(cf_vector**)data;
1828 cf_vector_destroy(pid_vector);
1829 return CF_SHASH_REDUCE_DELETE;
1830}
1831
1832/**
1833 * Serialize each vinfo and accumulated pids to the input buffer.
1834 */
1835static int
1836exchange_namespace_hash_serialize_reduce(const void* key, void* data,
1837 void* udata)
1838{
1839 const as_partition_version* vinfo = (const as_partition_version*)key;
1840 cf_vector* pid_vector = *(cf_vector**)data;
1841 cf_dyn_buf* dyn_buf = (cf_dyn_buf*)udata;
1842
1843 // Append the vinfo.
1844 cf_dyn_buf_append_buf(dyn_buf, (uint8_t*)vinfo, sizeof(*vinfo));
1845
1846 // Append the count of pids.
1847 uint32_t num_pids = cf_vector_size(pid_vector);
1848 cf_dyn_buf_append_buf(dyn_buf, (uint8_t*)&num_pids, sizeof(num_pids));
1849
1850 // Append each pid.
1851 for (int i = 0; i < num_pids; i++) {
1852 uint16_t* pid = cf_vector_getp(pid_vector, i);
1853 cf_dyn_buf_append_buf(dyn_buf, (uint8_t*)pid, sizeof(*pid));
1854 }
1855
1856 return CF_SHASH_OK;
1857}
1858
1859/**
1860 * Append namespace payload, in as_exchange_namespace_payload format, for a
1861 * namespace to the dynamic buffer.
1862 *
1863 * @param ns the namespace.
1864 * @param dyn_buf the dynamic buffer.
1865 */
1866static void
1867exchange_data_namespace_payload_add(as_namespace* ns, cf_dyn_buf* dyn_buf)
1868{
1869 // A hash from each unique non null vinfo to a vector of partition ids
1870 // having the vinfo.
1871 cf_shash* ns_hash = cf_shash_create(exchange_vinfo_shash,
1872 sizeof(as_partition_version), sizeof(cf_vector*),
1873 AS_EXCHANGE_UNIQUE_VINFO_MAX_SIZE_SOFT, 0);
1874
1875 as_partition* partitions = ns->partitions;
1876
1877 // Populate the hash with one entry for each vinfo
1878 for (int i = 0; i < AS_PARTITIONS; i++) {
1879 as_partition_version* current_vinfo = &partitions[i].version;
1880 exchange_namespace_hash_pid_add(ns_hash, current_vinfo, i);
1881 }
1882
1883 // We are ready to populate the dyn buffer with this ns's data.
1884 DEBUG("namespace %s has %d unique vinfos", ns->name,
1885 cf_shash_get_size(ns_hash));
1886
1887 // Append the vinfo count.
1888 uint32_t num_vinfos = cf_shash_get_size(ns_hash);
1889 cf_dyn_buf_append_buf(dyn_buf, (uint8_t*)&num_vinfos, sizeof(num_vinfos));
1890
1891 // Append vinfos and partitions.
1892 cf_shash_reduce(ns_hash, exchange_namespace_hash_serialize_reduce, dyn_buf);
1893
1894 // Destroy the intermediate hash and the pid vectors.
1895 cf_shash_reduce(ns_hash, exchange_namespace_hash_destroy_reduce, NULL);
1896
1897 cf_shash_destroy(ns_hash);
1898}
1899
1900/**
1901 * Prepare the exchanged data payloads for current exchange round.
1902 */
1903static void
1904exchange_data_payload_prepare()
1905{
1906 EXCHANGE_LOCK();
1907
1908 // Block / abort migrations and freeze the partition version infos.
1909 as_partition_balance_disallow_migrations();
1910 as_partition_balance_synchronize_migrations();
1911
1912 // Ensure ns->smd_roster is synchronized exchanged partition versions.
1913 pthread_mutex_lock(&g_exchanged_info_lock);
1914
1915 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
1916 as_namespace* ns = g_config.namespaces[ns_ix];
1917
1918 // May change flags on partition versions we're about to exchange.
1919 as_partition_balance_protect_roster_set(ns);
1920
1921 // Append payload for each namespace.
1922
1923 // TODO - add API to reset dynbuf?
1924 g_exchange.self_data_dyn_buf[ns_ix].used_sz = 0;
1925
1926 exchange_data_namespace_payload_add(ns,
1927 &g_exchange.self_data_dyn_buf[ns_ix]);
1928 }
1929
1930 g_exchange.data_msg = exchange_msg_get(AS_EXCHANGE_MSG_TYPE_DATA);
1931 exchange_msg_data_payload_set(g_exchange.data_msg);
1932
1933 pthread_mutex_unlock(&g_exchanged_info_lock);
1934
1935 EXCHANGE_UNLOCK();
1936}
1937
1938/**
1939 * Indicates if the per-namespace fields in an incoming data message are valid.
1940 *
1941 * @return number of namespaces.
1942 */
1943static uint32_t
1944exchange_data_msg_get_num_namespaces(as_exchange_event* msg_event)
1945{
1946 uint32_t num_namespaces_sent = 0;
1947 uint32_t num_namespace_elements_sent = 0;
1948
1949 if (!msg_msgpack_list_get_count(msg_event->msg,
1950 AS_EXCHANGE_MSG_NAMESPACES, &num_namespaces_sent)
1951 || num_namespaces_sent > AS_NAMESPACE_SZ) {
1952 WARNING("received invalid namespaces from node %"PRIx64,
1953 msg_event->msg_source);
1954 return 0;
1955 }
1956
1957 if (!msg_msgpack_list_get_count(msg_event->msg,
1958 AS_EXCHANGE_MSG_NS_PARTITION_VERSIONS, &num_namespace_elements_sent)
1959 || num_namespaces_sent != num_namespace_elements_sent) {
1960 WARNING("received invalid partition versions from node %"PRIx64,
1961 msg_event->msg_source);
1962 return 0;
1963 }
1964
1965 if (!msg_msgpack_list_get_count(msg_event->msg,
1966 AS_EXCHANGE_MSG_NS_RACK_IDS, &num_namespace_elements_sent)
1967 || num_namespaces_sent != num_namespace_elements_sent) {
1968 WARNING("received invalid cluster groups from node %"PRIx64,
1969 msg_event->msg_source);
1970 return 0;
1971 }
1972
1973 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_ROSTER_GENERATIONS)
1974 && (!msg_msgpack_list_get_count(msg_event->msg,
1975 AS_EXCHANGE_MSG_NS_ROSTER_GENERATIONS,
1976 &num_namespace_elements_sent)
1977 || num_namespaces_sent != num_namespace_elements_sent)) {
1978 WARNING("received invalid roster generations from node %"PRIx64,
1979 msg_event->msg_source);
1980 return 0;
1981 }
1982
1983 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_ROSTERS)
1984 && (!msg_msgpack_list_get_count(msg_event->msg,
1985 AS_EXCHANGE_MSG_NS_ROSTERS,
1986 &num_namespace_elements_sent)
1987 || num_namespaces_sent != num_namespace_elements_sent)) {
1988 WARNING("received invalid rosters from node %"PRIx64,
1989 msg_event->msg_source);
1990 return 0;
1991 }
1992
1993 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_ROSTERS_RACK_IDS)
1994 && (!msg_msgpack_list_get_count(msg_event->msg,
1995 AS_EXCHANGE_MSG_NS_ROSTERS_RACK_IDS,
1996 &num_namespace_elements_sent)
1997 || num_namespaces_sent != num_namespace_elements_sent)) {
1998 WARNING("received invalid rosters-rack-ids from node %"PRIx64,
1999 msg_event->msg_source);
2000 return 0;
2001 }
2002
2003 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_EVENTUAL_REGIMES)
2004 && (!msg_msgpack_list_get_count(msg_event->msg,
2005 AS_EXCHANGE_MSG_NS_EVENTUAL_REGIMES,
2006 &num_namespace_elements_sent)
2007 || num_namespaces_sent != num_namespace_elements_sent)) {
2008 WARNING("received invalid eventual regimes from node %"PRIx64,
2009 msg_event->msg_source);
2010 return 0;
2011 }
2012
2013 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_REBALANCE_REGIMES)
2014 && (!msg_msgpack_list_get_count(msg_event->msg,
2015 AS_EXCHANGE_MSG_NS_REBALANCE_REGIMES,
2016 &num_namespace_elements_sent)
2017 || num_namespaces_sent != num_namespace_elements_sent)) {
2018 WARNING("received invalid rebalance regimes from node %"PRIx64,
2019 msg_event->msg_source);
2020 return 0;
2021 }
2022
2023 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_REBALANCE_FLAGS)
2024 && (!msg_msgpack_list_get_count(msg_event->msg,
2025 AS_EXCHANGE_MSG_NS_REBALANCE_FLAGS,
2026 &num_namespace_elements_sent)
2027 || num_namespaces_sent != num_namespace_elements_sent)) {
2028 WARNING("received invalid rebalance flags from node %"PRIx64,
2029 msg_event->msg_source);
2030 return 0;
2031 }
2032
2033 return num_namespaces_sent;
2034}
2035
2036/**
2037 * Basic validation for incoming namespace payload.
2038 * Validates that
2039 * 1. Number of vinfos < AS_PARTITIONS.
2040 * 2. Each partition is between 0 and AS_PARTITIONS.
2041 * 3. Namespaces payload does not exceed payload_end_ptr.
2042 *
2043 * @param ns_payload pointer to start of the namespace payload.
2044 * @param ns_payload_size the size of the input namespace payload.
2045 * @return true if this is a valid payload.
2046 */
2047static bool
2048exchange_namespace_payload_is_valid(as_exchange_ns_vinfos_payload* ns_payload,
2049 uint32_t ns_payload_size)
2050{
2051 // Pointer past the last byte in the payload.
2052 uint8_t* payload_end_ptr = (uint8_t*)ns_payload + ns_payload_size;
2053
2054 if ((uint8_t*)ns_payload->vinfos > payload_end_ptr) {
2055 return false;
2056 }
2057
2058 if (ns_payload->num_vinfos > AS_PARTITIONS) {
2059 return false;
2060 }
2061
2062 uint8_t* read_ptr = (uint8_t*)ns_payload->vinfos;
2063
2064 for (uint32_t i = 0; i < ns_payload->num_vinfos; i++) {
2065 if (read_ptr >= payload_end_ptr) {
2066 return false;
2067 }
2068
2069 as_exchange_vinfo_payload* vinfo_payload =
2070 (as_exchange_vinfo_payload*)read_ptr;
2071
2072 if ((uint8_t*)vinfo_payload->pids > payload_end_ptr) {
2073 return false;
2074 }
2075
2076 if (vinfo_payload->num_pids > AS_PARTITIONS) {
2077 return false;
2078 }
2079
2080 size_t pids_size = vinfo_payload->num_pids * sizeof(uint16_t);
2081
2082 if ((uint8_t*)vinfo_payload->pids + pids_size > payload_end_ptr) {
2083 return false;
2084 }
2085
2086 for (uint32_t j = 0; j < vinfo_payload->num_pids; j++) {
2087 if (vinfo_payload->pids[j] >= AS_PARTITIONS) {
2088 return false;
2089 }
2090 }
2091
2092 read_ptr += sizeof(as_exchange_vinfo_payload) + pids_size;
2093 }
2094
2095 if (read_ptr != payload_end_ptr) {
2096 // There are unaccounted for extra bytes in the payload.
2097 return false;
2098 }
2099
2100 return true;
2101}
2102
2103/*
2104 * ----------------------------------------------------------------------------
2105 * Common across all states
2106 * ----------------------------------------------------------------------------
2107 */
2108
2109/**
2110 * Update committed exchange cluster data.
2111 * @param cluster_key the new cluster key.
2112 * @param succession the new succession. Can be NULL, for orphan state.
2113 */
2114static void
2115exchange_commited_cluster_update(as_cluster_key cluster_key,
2116 cf_vector* succession)
2117{
2118 EXCHANGE_COMMITTED_CLUSTER_WLOCK();
2119 g_exchange.committed_cluster_key = cluster_key;
2120 vector_clear(&g_exchange.committed_succession_list);
2121
2122 if (succession && cf_vector_size(succession) > 0) {
2123 vector_copy(&g_exchange.committed_succession_list,
2124 &g_exchange.succession_list);
2125 g_exchange.committed_cluster_size = cf_vector_size(succession);
2126 cf_vector_get(succession, 0, &g_exchange.committed_principal);
2127 }
2128 else {
2129 g_exchange.committed_cluster_size = 0;
2130 g_exchange.committed_principal = 0;
2131 }
2132
2133 g_exchange.committed_cluster_generation++;
2134 EXCHANGE_COMMITTED_CLUSTER_UNLOCK();
2135}
2136
2137/**
2138 * Indicates if self node is the cluster principal.
2139 */
2140static bool
2141exchange_self_is_principal()
2142{
2143 EXCHANGE_LOCK();
2144 bool is_principal = (g_config.self_node == g_exchange.principal);
2145 EXCHANGE_UNLOCK();
2146 return is_principal;
2147}
2148
2149/**
2150 * Dump exchange state.
2151 */
2152static void
2153exchange_dump(cf_fault_severity severity, bool verbose)
2154{
2155 EXCHANGE_LOCK();
2156 cf_vector* node_vector = cf_vector_stack_create(cf_node);
2157
2158 char* state_str = "";
2159 switch (g_exchange.state) {
2160 case AS_EXCHANGE_STATE_REST:
2161 state_str = "rest";
2162 break;
2163 case AS_EXCHANGE_STATE_EXCHANGING:
2164 state_str = "exchanging";
2165 break;
2166 case AS_EXCHANGE_STATE_READY_TO_COMMIT:
2167 state_str = "ready to commit";
2168 break;
2169 case AS_EXCHANGE_STATE_ORPHANED:
2170 state_str = "orphaned";
2171 break;
2172 }
2173
2174 LOG(severity, "EXG: state: %s", state_str);
2175
2176 if (g_exchange.state == AS_EXCHANGE_STATE_ORPHANED) {
2177 LOG(severity, "EXG: client transactions blocked: %s",
2178 g_exchange.orphan_state_are_transactions_blocked ?
2179 "true" : "false");
2180 LOG(severity, "EXG: orphan since: %"PRIu64"(millis)",
2181 cf_getms() - g_exchange.orphan_state_start_time);
2182 }
2183 else {
2184 LOG(severity, "EXG: cluster key: %"PRIx64, g_exchange.cluster_key);
2185 as_clustering_log_cf_node_vector(severity, AS_EXCHANGE,
2186 "EXG: succession:", &g_exchange.succession_list);
2187
2188 if (verbose) {
2189 vector_clear(node_vector);
2190 exchange_nodes_find_send_unacked(node_vector);
2191 as_clustering_log_cf_node_vector(severity, AS_EXCHANGE,
2192 "EXG: send pending:", node_vector);
2193
2194 vector_clear(node_vector);
2195 exchange_nodes_find_not_received(node_vector);
2196 as_clustering_log_cf_node_vector(severity, AS_EXCHANGE,
2197 "EXG: receive pending:", node_vector);
2198
2199 if (exchange_self_is_principal()) {
2200 vector_clear(node_vector);
2201 exchange_nodes_find_not_ready_to_commit(node_vector);
2202 as_clustering_log_cf_node_vector(severity, AS_EXCHANGE,
2203 "EXG: ready to commit pending:", node_vector);
2204 }
2205 }
2206 }
2207
2208 cf_vector_destroy(node_vector);
2209 EXCHANGE_UNLOCK();
2210}
2211
2212/**
2213 * Reset state for new round of exchange, while reusing as mush heap allocated
2214 * space for exchanged data.
2215 * @param new_succession_list new succession list. Can be NULL for orphaned
2216 * state.
2217 * @param new_cluster_key 0 for orphaned state.
2218 */
2219static void
2220exchange_reset_for_new_round(cf_vector* new_succession_list,
2221 as_cluster_key new_cluster_key)
2222{
2223 EXCHANGE_LOCK();
2224 vector_clear(&g_exchange.succession_list);
2225 g_exchange.principal = 0;
2226
2227 if (new_succession_list && cf_vector_size(new_succession_list) > 0) {
2228 vector_copy(&g_exchange.succession_list, new_succession_list);
2229 // Set the principal node.
2230 cf_vector_get(&g_exchange.succession_list, 0, &g_exchange.principal);
2231 }
2232
2233 // Reset accumulated node states.
2234 exchange_node_states_reset();
2235
2236 g_exchange.cluster_key = new_cluster_key;
2237
2238 if (g_exchange.data_msg) {
2239 as_fabric_msg_put(g_exchange.data_msg);
2240 g_exchange.data_msg = NULL;
2241 }
2242
2243 EXCHANGE_UNLOCK();
2244}
2245
2246/**
2247 * Commit exchange state to reflect self node being an orphan.
2248 */
2249static void
2250exchange_orphan_commit()
2251{
2252 EXCHANGE_LOCK();
2253 exchange_commited_cluster_update(0, NULL);
2254 WARNING("blocking client transactions in orphan state!");
2255 as_partition_balance_revert_to_orphan();
2256 g_exchange.orphan_state_are_transactions_blocked = true;
2257 EXCHANGE_UNLOCK();
2258}
2259
2260/**
2261 * Receive an orphaned event and abort current round.
2262 */
2263static void
2264exchange_orphaned_handle(as_clustering_event* orphaned_event)
2265{
2266 DEBUG("got orphaned event");
2267
2268 EXCHANGE_LOCK();
2269
2270 if (g_exchange.state != AS_EXCHANGE_STATE_REST
2271 && g_exchange.state != AS_EXCHANGE_STATE_ORPHANED) {
2272 INFO("aborting partition exchange with cluster key %"PRIx64,
2273 g_exchange.cluster_key);
2274 }
2275
2276 g_exchange.state = AS_EXCHANGE_STATE_ORPHANED;
2277 exchange_reset_for_new_round(NULL, 0);
2278
2279 // Stop ongoing migrations if any.
2280 as_partition_balance_disallow_migrations();
2281 as_partition_balance_synchronize_migrations();
2282
2283 // Update the time this node got into orphan state.
2284 g_exchange.orphan_state_start_time = cf_getms();
2285
2286 // Potentially temporary orphan state. We will timeout and commit orphan
2287 // state if this persists for long.
2288 g_exchange.orphan_state_are_transactions_blocked = false;
2289
2290 EXCHANGE_UNLOCK();
2291}
2292
2293/**
2294 * Receive a cluster change event and start a new data exchange round.
2295 */
2296static void
2297exchange_cluster_change_handle(as_clustering_event* clustering_event)
2298{
2299 EXCHANGE_LOCK();
2300
2301 DEBUG("got cluster change event");
2302
2303 if (g_exchange.state != AS_EXCHANGE_STATE_REST
2304 && g_exchange.state != AS_EXCHANGE_STATE_ORPHANED) {
2305 INFO("aborting partition exchange with cluster key %"PRIx64,
2306 g_exchange.cluster_key);
2307 }
2308
2309 exchange_reset_for_new_round(clustering_event->succession_list,
2310 clustering_event->cluster_key);
2311
2312 g_exchange.state = AS_EXCHANGE_STATE_EXCHANGING;
2313
2314 INFO("data exchange started with cluster key %"PRIx64,
2315 g_exchange.cluster_key);
2316
2317 // Prepare the data payload.
2318 exchange_data_payload_prepare();
2319
2320 EXCHANGE_UNLOCK();
2321
2322 exchange_data_msg_send_pending_ack();
2323}
2324
2325/**
2326 * Handle a cluster change event.
2327 * @param cluster_change_event the cluster change event.
2328 */
2329static void
2330exchange_clustering_event_handle(as_exchange_event* exchange_clustering_event)
2331{
2332 as_clustering_event* clustering_event =
2333 exchange_clustering_event->clustering_event;
2334
2335 switch (clustering_event->type) {
2336 case AS_CLUSTERING_ORPHANED:
2337 exchange_orphaned_handle(clustering_event);
2338 break;
2339 case AS_CLUSTERING_CLUSTER_CHANGED:
2340 exchange_cluster_change_handle(clustering_event);
2341 break;
2342 }
2343}
2344
2345/*
2346 * ----------------------------------------------------------------------------
2347 * Orphan state event handling
2348 * ----------------------------------------------------------------------------
2349 */
2350
2351/**
2352 * The wait time in orphan state after which client transactions and transaction
2353 * related interactions (e.g. valid partition map publishing) should be blocked.
2354 */
2355static uint32_t
2356exchange_orphan_transaction_block_timeout()
2357{
2358 return (uint32_t)as_clustering_quantum_interval()
2359 * AS_EXCHANGE_REVERT_ORPHAN_INTERVALS;
2360}
2361
2362/**
2363 * Handle the timer event and if we have been an orphan for too long, block
2364 * client transactions.
2365 */
2366static void
2367exchange_orphan_timer_event_handle()
2368{
2369 uint32_t timeout = exchange_orphan_transaction_block_timeout();
2370 EXCHANGE_LOCK();
2371 if (!g_exchange.orphan_state_are_transactions_blocked
2372 && g_exchange.orphan_state_start_time + timeout < cf_getms()) {
2373 exchange_orphan_commit();
2374 }
2375 EXCHANGE_UNLOCK();
2376}
2377
2378/**
2379 * Event processing in the orphan state.
2380 */
2381static void
2382exchange_orphan_event_handle(as_exchange_event* event)
2383{
2384 switch (event->type) {
2385 case AS_EXCHANGE_EVENT_CLUSTER_CHANGE:
2386 exchange_clustering_event_handle(event);
2387 break;
2388 case AS_EXCHANGE_EVENT_TIMER:
2389 exchange_orphan_timer_event_handle();
2390 break;
2391 default:
2392 break;
2393 }
2394}
2395
2396/*
2397 * ----------------------------------------------------------------------------
2398 * Rest state event handling
2399 * ----------------------------------------------------------------------------
2400 */
2401
2402/**
2403 * Process a message event when in rest state.
2404 */
2405static void
2406exchange_rest_msg_event_handle(as_exchange_event* msg_event)
2407{
2408 EXCHANGE_LOCK();
2409
2410 if (!exchange_msg_is_sane(msg_event->msg_source, msg_event->msg)) {
2411 goto Exit;
2412 }
2413
2414 as_exchange_msg_type msg_type;
2415 exchange_msg_type_get(msg_event->msg, &msg_type);
2416
2417 if (exchange_self_is_principal()
2418 && msg_type == AS_EXCHANGE_MSG_TYPE_READY_TO_COMMIT) {
2419 // The commit message did not make it to the source node, hence it send
2420 // us the ready to commit message. Resend the commit message.
2421 DEBUG("received a ready to commit message from %"PRIx64,
2422 msg_event->msg_source);
2423 exchange_commit_msg_send(msg_event->msg_source);
2424 }
2425 else {
2426 DEBUG(
2427 "rest state received unexpected mesage of type %d from node %"PRIx64,
2428 msg_type, msg_event->msg_source);
2429
2430 }
2431
2432Exit:
2433
2434 EXCHANGE_UNLOCK();
2435}
2436
2437/**
2438 * Event processing in the rest state.
2439 */
2440static void
2441exchange_rest_event_handle(as_exchange_event* event)
2442{
2443 switch (event->type) {
2444 case AS_EXCHANGE_EVENT_CLUSTER_CHANGE:
2445 exchange_clustering_event_handle(event);
2446 break;
2447 case AS_EXCHANGE_EVENT_MSG:
2448 exchange_rest_msg_event_handle(event);
2449 break;
2450 default:
2451 break;
2452 }
2453}
2454
2455/*
2456 * ----------------------------------------------------------------------------
2457 * Exchanging state event handling
2458 * ----------------------------------------------------------------------------
2459 */
2460
2461/**
2462 * Commit namespace payload for a node.
2463 * Assumes the namespace vinfo and succession list have been zero set before.
2464 */
2465static void
2466exchange_namespace_payload_pre_commit_for_node(cf_node node,
2467 as_exchange_node_namespace_data* namespace_data)
2468{
2469 as_namespace* ns = namespace_data->local_namespace;
2470
2471 uint32_t sl_ix = ns->cluster_size++;
2472
2473 ns->succession[sl_ix] = node;
2474
2475 as_exchange_ns_vinfos_payload* ns_payload =
2476 namespace_data->partition_versions;
2477 uint8_t* read_ptr = (uint8_t*)ns_payload->vinfos;
2478
2479 for (int i = 0; i < ns_payload->num_vinfos; i++) {
2480 as_exchange_vinfo_payload* vinfo_payload =
2481 (as_exchange_vinfo_payload*)read_ptr;
2482
2483 for (int j = 0; j < vinfo_payload->num_pids; j++) {
2484 memcpy(&ns->cluster_versions[sl_ix][vinfo_payload->pids[j]],
2485 &vinfo_payload->vinfo, sizeof(vinfo_payload->vinfo));
2486 }
2487
2488 read_ptr += sizeof(as_exchange_vinfo_payload)
2489 + vinfo_payload->num_pids * sizeof(uint16_t);
2490 }
2491
2492 ns->rack_ids[sl_ix] = namespace_data->rack_id;
2493
2494 if (namespace_data->roster_generation > ns->roster_generation) {
2495 ns->roster_generation = namespace_data->roster_generation;
2496 ns->roster_count = namespace_data->roster_count;
2497
2498 memcpy(ns->roster, namespace_data->roster,
2499 ns->roster_count * sizeof(cf_node));
2500
2501 if (namespace_data->roster_rack_ids) {
2502 memcpy(ns->roster_rack_ids, namespace_data->roster_rack_ids,
2503 ns->roster_count * sizeof(uint32_t));
2504 }
2505 else {
2506 memset(ns->roster_rack_ids, 0, ns->roster_count * sizeof(uint32_t));
2507 }
2508 }
2509
2510 if (ns->eventual_regime != 0 &&
2511 namespace_data->eventual_regime > ns->eventual_regime) {
2512 ns->eventual_regime = namespace_data->eventual_regime;
2513 }
2514
2515 ns->rebalance_regimes[sl_ix] = namespace_data->rebalance_regime;
2516
2517 // Prefer uniform balance only if all nodes prefer it.
2518 if ((namespace_data->rebalance_flags &
2519 AS_EXCHANGE_REBALANCE_FLAG_UNIFORM) == 0) {
2520 ns->prefer_uniform_balance = false;
2521 }
2522
2523 bool is_node_quiesced = (namespace_data->rebalance_flags &
2524 AS_EXCHANGE_REBALANCE_FLAG_QUIESCE) != 0;
2525
2526 if (node == g_config.self_node) {
2527 ns->is_quiesced = is_node_quiesced;
2528 }
2529
2530 ns->quiesced[sl_ix] = is_node_quiesced;
2531}
2532
2533/**
2534 * Commit exchange data for a given node.
2535 */
2536static void
2537exchange_data_pre_commit_for_node(cf_node node, uint32_t ix)
2538{
2539 EXCHANGE_LOCK();
2540 as_exchange_node_state node_state;
2541 exchange_node_state_get_safe(node, &node_state);
2542
2543 g_exchange.compatibility_ids[ix] = node_state.data->compatibility_id;
2544
2545 if (node_state.data->compatibility_id < g_exchange.min_compatibility_id) {
2546 g_exchange.min_compatibility_id = node_state.data->compatibility_id;
2547 }
2548
2549 for (uint32_t i = 0; i < node_state.data->num_namespaces; i++) {
2550 exchange_namespace_payload_pre_commit_for_node(node,
2551 &node_state.data->namespace_data[i]);
2552 }
2553
2554 EXCHANGE_UNLOCK();
2555}
2556
2557/**
2558 * Check that there's not a mixture of AP and CP nodes in any namespace.
2559 */
2560static bool
2561exchange_data_pre_commit_ap_cp_check()
2562{
2563 for (uint32_t i = 0; i < g_config.n_namespaces; i++) {
2564 as_namespace* ns = g_config.namespaces[i];
2565
2566 cf_node ap_node = (cf_node)0;
2567 cf_node cp_node = (cf_node)0;
2568
2569 for (uint32_t n = 0; n < ns->cluster_size; n++) {
2570 if (ns->rebalance_regimes[n] == 0) {
2571 ap_node = ns->succession[n];
2572 }
2573 else {
2574 cp_node = ns->succession[n];
2575 }
2576 }
2577
2578 if (ap_node != (cf_node)0 && cp_node != (cf_node)0) {
2579 WARNING("{%s} has mixture of AP and SC nodes - for example %lx is AP and %lx is SC",
2580 ns->name, ap_node, cp_node);
2581 return false;
2582 }
2583 }
2584
2585 return true;
2586}
2587
2588/**
2589 * Pre commit namespace data anticipating a successful commit from the
2590 * principal. This pre commit is to ensure regime advances in cp mode to cover
2591 * the case where the principal commits exchange data but the commit to a
2592 * non-principal is lost.
2593 */
2594static bool
2595exchange_exchanging_pre_commit()
2596{
2597 EXCHANGE_LOCK();
2598 pthread_mutex_lock(&g_exchanged_info_lock);
2599
2600 memset(g_exchange.compatibility_ids, 0,
2601 sizeof(g_exchange.compatibility_ids));
2602 g_exchange.min_compatibility_id = UINT32_MAX;
2603
2604 // Reset exchange data for all namespaces.
2605 for (int i = 0; i < g_config.n_namespaces; i++) {
2606 as_namespace* ns = g_config.namespaces[i];
2607 memset(ns->succession, 0, sizeof(ns->succession));
2608
2609 // Assuming zero to represent "null" partition.
2610 memset(ns->cluster_versions, 0, sizeof(ns->cluster_versions));
2611
2612 memset(ns->rack_ids, 0, sizeof(ns->rack_ids));
2613
2614 ns->is_quiesced = false;
2615 memset(ns->quiesced, 0, sizeof(ns->quiesced));
2616
2617 ns->roster_generation = 0;
2618 ns->roster_count = 0;
2619 memset(ns->roster, 0, sizeof(ns->roster));
2620 memset(ns->roster_rack_ids, 0, sizeof(ns->roster_rack_ids));
2621
2622 // Note - not clearing ns->eventual_regime - prior non-0 value means CP.
2623 // Note - not clearing ns->rebalance_regime - it's not set here.
2624 memset(ns->rebalance_regimes, 0, sizeof(ns->rebalance_regimes));
2625
2626 ns->cluster_size = 0;
2627
2628 // Any node that does not prefer uniform balance will set this false.
2629 ns->prefer_uniform_balance = true;
2630 }
2631
2632 // Fill the namespace partition version info in succession list order.
2633 int num_nodes = cf_vector_size(&g_exchange.succession_list);
2634 for (int i = 0; i < num_nodes; i++) {
2635 cf_node node;
2636 cf_vector_get(&g_exchange.succession_list, i, &node);
2637 exchange_data_pre_commit_for_node(node, i);
2638 }
2639
2640 // Collected all exchanged data - do final configuration consistency checks.
2641 if (!exchange_data_pre_commit_ap_cp_check()) {
2642 WARNING("abandoned exchange - fix configuration conflict");
2643 pthread_mutex_unlock(&g_exchanged_info_lock);
2644 EXCHANGE_UNLOCK();
2645 return false;
2646 }
2647
2648 for (int i = 0; i < g_config.n_namespaces; i++) {
2649 as_namespace* ns = g_config.namespaces[i];
2650
2651 if (ns->eventual_regime != 0) {
2652 ns->eventual_regime += 2;
2653
2654 as_storage_save_regime(ns);
2655
2656 INFO("{%s} eventual-regime %u ready", ns->name,
2657 ns->eventual_regime);
2658 }
2659 }
2660
2661 pthread_mutex_unlock(&g_exchanged_info_lock);
2662 EXCHANGE_UNLOCK();
2663
2664 return true;
2665}
2666
2667/**
2668 * Check to see if all exchange data is sent and received. If so switch to
2669 * ready_to_commit state.
2670 */
2671static void
2672exchange_exchanging_check_switch_ready_to_commit()
2673{
2674 EXCHANGE_LOCK();
2675
2676 cf_vector* node_vector = cf_vector_stack_create(cf_node);
2677
2678 if (g_exchange.state == AS_EXCHANGE_STATE_REST
2679 || g_exchange.cluster_key == 0) {
2680 goto Exit;
2681 }
2682
2683 exchange_nodes_find_send_unacked(node_vector);
2684 if (cf_vector_size(node_vector) > 0) {
2685 // We still have unacked exchange send messages.
2686 goto Exit;
2687 }
2688
2689 vector_clear(node_vector);
2690 exchange_nodes_find_not_received(node_vector);
2691 if (cf_vector_size(node_vector) > 0) {
2692 // We still haven't received exchange messages from all nodes in the
2693 // succession list.
2694 goto Exit;
2695 }
2696
2697 if (!exchange_exchanging_pre_commit()) {
2698 // Pre-commit failed. We are not ready to commit.
2699 goto Exit;
2700 }
2701
2702 g_exchange.state = AS_EXCHANGE_STATE_READY_TO_COMMIT;
2703
2704 DEBUG("ready to commit exchange data for cluster key %"PRIx64,
2705 g_exchange.cluster_key);
2706
2707Exit:
2708 cf_vector_destroy(node_vector);
2709
2710 if (g_exchange.state == AS_EXCHANGE_STATE_READY_TO_COMMIT) {
2711 exchange_ready_to_commit_msg_send();
2712 }
2713
2714 EXCHANGE_UNLOCK();
2715}
2716
2717/**
2718 * Handle incoming data message.
2719 *
2720 * Assumes the message has been checked for sanity.
2721 */
2722static void
2723exchange_exchanging_data_msg_handle(as_exchange_event* msg_event)
2724{
2725 EXCHANGE_LOCK();
2726
2727 DEBUG("received exchange data from node %"PRIx64, msg_event->msg_source);
2728
2729 as_exchange_node_state node_state;
2730 exchange_node_state_get_safe(msg_event->msg_source, &node_state);
2731
2732 if (!node_state.received) {
2733 node_state.data->compatibility_id = 0;
2734 msg_get_uint32(msg_event->msg, AS_EXCHANGE_MSG_COMPATIBILITY_ID,
2735 &node_state.data->compatibility_id);
2736
2737 uint32_t num_namespaces_sent = exchange_data_msg_get_num_namespaces(
2738 msg_event);
2739
2740 if (num_namespaces_sent == 0) {
2741 WARNING("ignoring invalid exchange data from node %"PRIx64,
2742 msg_event->msg_source);
2743 goto Exit;
2744 }
2745
2746 cf_vector_define(namespace_list, sizeof(msg_buf_ele),
2747 num_namespaces_sent, 0);
2748 cf_vector_define(partition_versions, sizeof(msg_buf_ele),
2749 num_namespaces_sent, 0);
2750 uint32_t rack_ids[num_namespaces_sent];
2751
2752 uint32_t roster_generations[num_namespaces_sent];
2753 cf_vector_define(rosters, sizeof(msg_buf_ele), num_namespaces_sent, 0);
2754 cf_vector_define(rosters_rack_ids, sizeof(msg_buf_ele),
2755 num_namespaces_sent, 0);
2756
2757 memset(roster_generations, 0, sizeof(roster_generations));
2758
2759 uint32_t eventual_regimes[num_namespaces_sent];
2760 uint32_t rebalance_regimes[num_namespaces_sent];
2761 uint32_t rebalance_flags[num_namespaces_sent];
2762
2763 memset(eventual_regimes, 0, sizeof(eventual_regimes));
2764 memset(rebalance_regimes, 0, sizeof(rebalance_regimes));
2765 memset(rebalance_flags, 0, sizeof(rebalance_flags));
2766
2767 if (!msg_msgpack_list_get_buf_array_presized(msg_event->msg,
2768 AS_EXCHANGE_MSG_NAMESPACES, &namespace_list)) {
2769 WARNING("received invalid namespaces from node %"PRIx64,
2770 msg_event->msg_source);
2771 goto Exit;
2772 }
2773
2774 if (!msg_msgpack_list_get_buf_array_presized(msg_event->msg,
2775 AS_EXCHANGE_MSG_NS_PARTITION_VERSIONS, &partition_versions)) {
2776 WARNING("received invalid partition versions from node %"PRIx64,
2777 msg_event->msg_source);
2778 goto Exit;
2779 }
2780
2781 uint32_t num_rack_ids = num_namespaces_sent;
2782
2783 if (!msg_msgpack_list_get_uint32_array(msg_event->msg,
2784 AS_EXCHANGE_MSG_NS_RACK_IDS, rack_ids, &num_rack_ids)) {
2785 WARNING("received invalid cluster groups from node %"PRIx64,
2786 msg_event->msg_source);
2787 goto Exit;
2788 }
2789
2790 uint32_t num_roster_generations = num_namespaces_sent;
2791
2792 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_ROSTER_GENERATIONS)
2793 && !msg_msgpack_list_get_uint32_array(msg_event->msg,
2794 AS_EXCHANGE_MSG_NS_ROSTER_GENERATIONS,
2795 roster_generations, &num_roster_generations)) {
2796 WARNING("received invalid roster generations from node %"PRIx64,
2797 msg_event->msg_source);
2798 goto Exit;
2799 }
2800
2801 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_ROSTERS)
2802 && !msg_msgpack_list_get_buf_array_presized(msg_event->msg,
2803 AS_EXCHANGE_MSG_NS_ROSTERS, &rosters)) {
2804 WARNING("received invalid rosters from node %"PRIx64,
2805 msg_event->msg_source);
2806 goto Exit;
2807 }
2808
2809 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_ROSTERS_RACK_IDS)
2810 && !msg_msgpack_list_get_buf_array_presized(msg_event->msg,
2811 AS_EXCHANGE_MSG_NS_ROSTERS_RACK_IDS,
2812 &rosters_rack_ids)) {
2813 WARNING("received invalid rosters-rack-ids from node %"PRIx64,
2814 msg_event->msg_source);
2815 goto Exit;
2816 }
2817
2818 uint32_t num_eventual_regimes = num_namespaces_sent;
2819
2820 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_EVENTUAL_REGIMES)
2821 && !msg_msgpack_list_get_uint32_array(msg_event->msg,
2822 AS_EXCHANGE_MSG_NS_EVENTUAL_REGIMES, eventual_regimes,
2823 &num_eventual_regimes)) {
2824 WARNING("received invalid eventual regimes from node %"PRIx64,
2825 msg_event->msg_source);
2826 goto Exit;
2827 }
2828
2829 uint32_t num_rebalance_regimes = num_namespaces_sent;
2830
2831 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_REBALANCE_REGIMES)
2832 && !msg_msgpack_list_get_uint32_array(msg_event->msg,
2833 AS_EXCHANGE_MSG_NS_REBALANCE_REGIMES, rebalance_regimes,
2834 &num_rebalance_regimes)) {
2835 WARNING("received invalid rebalance regimes from node %"PRIx64,
2836 msg_event->msg_source);
2837 goto Exit;
2838 }
2839
2840 uint32_t num_rebalance_flags = num_namespaces_sent;
2841
2842 if (msg_is_set(msg_event->msg, AS_EXCHANGE_MSG_NS_REBALANCE_FLAGS)
2843 && !msg_msgpack_list_get_uint32_array(msg_event->msg,
2844 AS_EXCHANGE_MSG_NS_REBALANCE_FLAGS, rebalance_flags,
2845 &num_rebalance_flags)) {
2846 WARNING("received invalid rebalance flags from node %"PRIx64,
2847 msg_event->msg_source);
2848 goto Exit;
2849 }
2850
2851 node_state.data->num_namespaces = 0;
2852
2853 for (uint32_t i = 0; i < num_namespaces_sent; i++) {
2854 msg_buf_ele* namespace_name_element = cf_vector_getp(
2855 &namespace_list, i);
2856
2857 // Find a match for the namespace.
2858 as_namespace* matching_namespace = as_namespace_get_bybuf(
2859 namespace_name_element->ptr, namespace_name_element->sz);
2860
2861 if (!matching_namespace) {
2862 continue;
2863 }
2864
2865 as_exchange_node_namespace_data* namespace_data =
2866 &node_state.data->namespace_data[node_state.data->num_namespaces];
2867 node_state.data->num_namespaces++;
2868
2869 namespace_data->local_namespace = matching_namespace;
2870 namespace_data->rack_id = rack_ids[i];
2871 namespace_data->roster_generation = roster_generations[i];
2872 namespace_data->eventual_regime = eventual_regimes[i];
2873 namespace_data->rebalance_regime = rebalance_regimes[i];
2874 namespace_data->rebalance_flags = rebalance_flags[i];
2875
2876 // Copy partition versions.
2877 msg_buf_ele* partition_versions_element = cf_vector_getp(
2878 &partition_versions, i);
2879
2880 if (!exchange_namespace_payload_is_valid(
2881 (as_exchange_ns_vinfos_payload*)partition_versions_element->ptr,
2882 partition_versions_element->sz)) {
2883 WARNING(
2884 "received invalid partition versions for namespace %s from node %"PRIx64,
2885 matching_namespace->name, msg_event->msg_source);
2886 goto Exit;
2887 }
2888
2889 namespace_data->partition_versions = cf_realloc(
2890 namespace_data->partition_versions,
2891 partition_versions_element->sz);
2892
2893 memcpy(namespace_data->partition_versions,
2894 partition_versions_element->ptr,
2895 partition_versions_element->sz);
2896
2897 // Copy rosters.
2898 // TODO - make this piece a utility function?
2899 if (namespace_data->roster_generation == 0) {
2900 namespace_data->roster_count = 0;
2901 }
2902 else {
2903 msg_buf_ele* roster_ele = cf_vector_getp(&rosters, i);
2904
2905 namespace_data->roster_count = roster_ele->sz / sizeof(cf_node);
2906
2907 if (namespace_data->roster_count == 0
2908 || namespace_data->roster_count > AS_CLUSTER_SZ
2909 || roster_ele->sz % sizeof(cf_node) != 0) {
2910 WARNING(
2911 "received invalid roster for namespace %s from node %"PRIx64,
2912 matching_namespace->name, msg_event->msg_source);
2913 goto Exit;
2914 }
2915
2916 namespace_data->roster = cf_realloc(namespace_data->roster,
2917 roster_ele->sz);
2918
2919 memcpy(namespace_data->roster, roster_ele->ptr, roster_ele->sz);
2920
2921 uint32_t rri_ele_sz = 0;
2922
2923 if (cf_vector_size(&rosters_rack_ids) != 0) {
2924 msg_buf_ele* rri_ele = cf_vector_getp(&rosters_rack_ids, i);
2925
2926 if (rri_ele->sz != 0) {
2927 rri_ele_sz = rri_ele->sz;
2928
2929 if (rri_ele_sz
2930 != namespace_data->roster_count
2931 * sizeof(uint32_t)) {
2932 WARNING(
2933 "received invalid roster-rack-ids for namespace %s from node %"PRIx64,
2934 matching_namespace->name,
2935 msg_event->msg_source);
2936 goto Exit;
2937 }
2938
2939 namespace_data->roster_rack_ids = cf_realloc(
2940 namespace_data->roster_rack_ids, rri_ele_sz);
2941
2942 memcpy(namespace_data->roster_rack_ids, rri_ele->ptr,
2943 rri_ele_sz);
2944 }
2945 }
2946
2947 if (rri_ele_sz == 0 && namespace_data->roster_rack_ids) {
2948 cf_free(namespace_data->roster_rack_ids);
2949 namespace_data->roster_rack_ids = NULL;
2950 }
2951 }
2952 }
2953
2954 // Mark exchange data received from the source.
2955 node_state.received = true;
2956 exchange_node_state_update(msg_event->msg_source, &node_state);
2957 }
2958 else {
2959 // Duplicate pinfo received. Ignore.
2960 INFO("received duplicate exchange data from node %"PRIx64,
2961 msg_event->msg_source);
2962 }
2963
2964 // Send an acknowledgement.
2965 exchange_data_ack_msg_send(msg_event->msg_source);
2966
2967 // Check if we can switch to ready to commit state.
2968 exchange_exchanging_check_switch_ready_to_commit();
2969
2970Exit:
2971 EXCHANGE_UNLOCK();
2972}
2973
2974/**
2975 * Handle incoming data ack message.
2976 *
2977 * Assumes the message has been checked for sanity.
2978 */
2979static void
2980exchange_exchanging_data_ack_msg_handle(as_exchange_event* msg_event)
2981{
2982 EXCHANGE_LOCK();
2983
2984 DEBUG("received exchange data ack from node %"PRIx64,
2985 msg_event->msg_source);
2986
2987 as_exchange_node_state node_state;
2988 exchange_node_state_get_safe(msg_event->msg_source, &node_state);
2989
2990 if (!node_state.send_acked) {
2991 // Mark send as acked in the node state.
2992 node_state.send_acked = true;
2993 exchange_node_state_update(msg_event->msg_source, &node_state);
2994 }
2995 else {
2996 // Duplicate ack. Ignore.
2997 DEBUG("received duplicate data ack from node %"PRIx64,
2998 msg_event->msg_source);
2999 }
3000
3001 // We might have send and received all partition info. Check for completion.
3002 exchange_exchanging_check_switch_ready_to_commit();
3003
3004 EXCHANGE_UNLOCK();
3005}
3006
3007/**
3008 * Process a message event when in exchanging state.
3009 */
3010static void
3011exchange_exchanging_msg_event_handle(as_exchange_event* msg_event)
3012{
3013 EXCHANGE_LOCK();
3014
3015 if (!exchange_msg_is_sane(msg_event->msg_source, msg_event->msg)) {
3016 goto Exit;
3017 }
3018
3019 as_exchange_msg_type msg_type;
3020 exchange_msg_type_get(msg_event->msg, &msg_type);
3021
3022 switch (msg_type) {
3023 case AS_EXCHANGE_MSG_TYPE_DATA:
3024 exchange_exchanging_data_msg_handle(msg_event);
3025 break;
3026 case AS_EXCHANGE_MSG_TYPE_DATA_ACK:
3027 exchange_exchanging_data_ack_msg_handle(msg_event);
3028 break;
3029 default:
3030 DEBUG(
3031 "exchanging state received unexpected mesage of type %d from node %"PRIx64,
3032 msg_type, msg_event->msg_source);
3033 }
3034Exit:
3035 EXCHANGE_UNLOCK();
3036}
3037
3038/**
3039 * Process a message event when in exchanging state.
3040 */
3041static void
3042exchange_exchanging_timer_event_handle(as_exchange_event* msg_event)
3043{
3044 EXCHANGE_LOCK();
3045 bool send_data = false;
3046
3047 cf_clock now = cf_getms();
3048
3049 // The timeout is a "linear" step function, where the timeout is constant
3050 // for the step interval.
3051 cf_clock min_timeout = EXCHANGE_SEND_MIN_TIMEOUT();
3052 cf_clock max_timeout = EXCHANGE_SEND_MAX_TIMEOUT();
3053 uint32_t step_interval = EXCHANGE_SEND_STEP_INTERVAL();
3054 cf_clock timeout = MAX(min_timeout,
3055 MIN(max_timeout,
3056 min_timeout
3057 * ((now - g_exchange.send_ts) / step_interval)));
3058
3059 if (g_exchange.send_ts + timeout < now) {
3060 send_data = true;
3061 }
3062
3063 EXCHANGE_UNLOCK();
3064
3065 if (send_data) {
3066 exchange_data_msg_send_pending_ack();
3067 }
3068}
3069
3070/**
3071 * Event processing in the exchanging state.
3072 */
3073static void
3074exchange_exchanging_event_handle(as_exchange_event* event)
3075{
3076 switch (event->type) {
3077 case AS_EXCHANGE_EVENT_CLUSTER_CHANGE:
3078 exchange_clustering_event_handle(event);
3079 break;
3080 case AS_EXCHANGE_EVENT_MSG:
3081 exchange_exchanging_msg_event_handle(event);
3082 break;
3083 case AS_EXCHANGE_EVENT_TIMER:
3084 exchange_exchanging_timer_event_handle(event);
3085 break;
3086 }
3087}
3088
3089/*
3090 * ----------------------------------------------------------------------------
3091 * Ready_To_Commit state event handling
3092 * ----------------------------------------------------------------------------
3093 */
3094
3095/**
3096 * Handle incoming ready to commit message.
3097 *
3098 * Assumes the message has been checked for sanity.
3099 */
3100static void
3101exchange_ready_to_commit_rtc_msg_handle(as_exchange_event* msg_event)
3102{
3103 if (!exchange_self_is_principal()) {
3104 WARNING(
3105 "non-principal self received ready to commit message from %"PRIx64" - ignoring",
3106 msg_event->msg_source);
3107 return;
3108 }
3109
3110 EXCHANGE_LOCK();
3111
3112 DEBUG("received ready to commit from node %"PRIx64, msg_event->msg_source);
3113
3114 as_exchange_node_state node_state;
3115 exchange_node_state_get_safe(msg_event->msg_source, &node_state);
3116
3117 if (!node_state.is_ready_to_commit) {
3118 // Mark as ready to commit in the node state.
3119 node_state.is_ready_to_commit = true;
3120 exchange_node_state_update(msg_event->msg_source, &node_state);
3121 }
3122 else {
3123 // Duplicate ready to commit received. Ignore.
3124 INFO("received duplicate ready to commit message from node %"PRIx64,
3125 msg_event->msg_source);
3126 }
3127
3128 cf_vector* node_vector = cf_vector_stack_create(cf_node);
3129 exchange_nodes_find_not_ready_to_commit(node_vector);
3130
3131 if (cf_vector_size(node_vector) <= 0) {
3132 // Send a commit message to all nodes in succession list.
3133 cf_node* node_list = NULL;
3134 int num_node_list = 0;
3135 cf_vector_to_stack_array(&g_exchange.succession_list, &node_list,
3136 &num_node_list);
3137 exchange_commit_msg_send_all(node_list, num_node_list);
3138 }
3139
3140 cf_vector_destroy(node_vector);
3141
3142 EXCHANGE_UNLOCK();
3143}
3144
3145/**
3146 * Commit accumulated exchange data.
3147 */
3148static void
3149exchange_data_commit()
3150{
3151 EXCHANGE_LOCK();
3152
3153 INFO("data exchange completed with cluster key %"PRIx64,
3154 g_exchange.cluster_key);
3155
3156 // Exchange is done, use the current cluster details as the committed
3157 // cluster details.
3158 exchange_commited_cluster_update(g_exchange.cluster_key,
3159 &g_exchange.succession_list);
3160
3161 // Force an update of the skew, to ensure new nodes if any have been checked
3162 // for skew.
3163 as_skew_monitor_update();
3164
3165 // Must cover partition balance since it may manipulate ns->cluster_size.
3166 pthread_mutex_lock(&g_exchanged_info_lock);
3167 as_partition_balance();
3168 pthread_mutex_unlock(&g_exchanged_info_lock);
3169
3170 EXCHANGE_UNLOCK();
3171}
3172
3173/**
3174 * Handle incoming data ack message.
3175 *
3176 * Assumes the message has been checked for sanity.
3177 */
3178static void
3179exchange_ready_to_commit_commit_msg_handle(as_exchange_event* msg_event)
3180{
3181 EXCHANGE_LOCK();
3182
3183 if (msg_event->msg_source != g_exchange.principal) {
3184 WARNING(
3185 "ignoring commit message from node %"PRIx64" - expected message from %"PRIx64,
3186 msg_event->msg_source, g_exchange.principal);
3187 goto Exit;
3188 }
3189
3190 INFO("received commit command from principal node %"PRIx64,
3191 msg_event->msg_source);
3192
3193 // Commit exchanged data.
3194 exchange_data_commit();
3195
3196 // Move to the rest state.
3197 g_exchange.state = AS_EXCHANGE_STATE_REST;
3198
3199 // Queue up a cluster change event for downstream sub systems.
3200 as_exchange_cluster_changed_event cluster_change_event;
3201
3202 EXCHANGE_COMMITTED_CLUSTER_RLOCK();
3203 cluster_change_event.cluster_key = g_exchange.committed_cluster_key;
3204 cluster_change_event.succession = vector_to_array(
3205 &g_exchange.committed_succession_list);
3206 cluster_change_event.cluster_size = g_exchange.committed_cluster_size;
3207 exchange_external_event_queue(&cluster_change_event);
3208 EXCHANGE_COMMITTED_CLUSTER_UNLOCK();
3209
3210Exit:
3211 EXCHANGE_UNLOCK();
3212}
3213
3214/**
3215 * Handle incoming data message in ready to commit stage.
3216 *
3217 * Assumes the message has been checked for sanity.
3218 */
3219static void
3220exchange_ready_to_commit_data_msg_handle(as_exchange_event* msg_event)
3221{
3222 EXCHANGE_LOCK();
3223
3224 DEBUG("received exchange data from node %"PRIx64, msg_event->msg_source);
3225
3226 // The source must have missed self node's data ack. Send an
3227 // acknowledgement.
3228 exchange_data_ack_msg_send(msg_event->msg_source);
3229
3230 EXCHANGE_UNLOCK();
3231}
3232
3233/**
3234 * Process a message event when in ready_to_commit state.
3235 */
3236static void
3237exchange_ready_to_commit_msg_event_handle(as_exchange_event* msg_event)
3238{
3239 EXCHANGE_LOCK();
3240
3241 if (!exchange_msg_is_sane(msg_event->msg_source, msg_event->msg)) {
3242 goto Exit;
3243 }
3244
3245 as_exchange_msg_type msg_type;
3246 exchange_msg_type_get(msg_event->msg, &msg_type);
3247
3248 switch (msg_type) {
3249 case AS_EXCHANGE_MSG_TYPE_READY_TO_COMMIT:
3250 exchange_ready_to_commit_rtc_msg_handle(msg_event);
3251 break;
3252 case AS_EXCHANGE_MSG_TYPE_COMMIT:
3253 exchange_ready_to_commit_commit_msg_handle(msg_event);
3254 break;
3255 case AS_EXCHANGE_MSG_TYPE_DATA:
3256 exchange_ready_to_commit_data_msg_handle(msg_event);
3257 break;
3258 default:
3259 DEBUG(
3260 "ready to commit state received unexpected message of type %d from node %"PRIx64,
3261 msg_type, msg_event->msg_source);
3262 }
3263Exit:
3264 EXCHANGE_UNLOCK();
3265}
3266
3267/**
3268 * Process a message event when in ready_to_commit state.
3269 */
3270static void
3271exchange_ready_to_commit_timer_event_handle(as_exchange_event* msg_event)
3272{
3273 EXCHANGE_LOCK();
3274
3275 if (g_exchange.ready_to_commit_send_ts + EXCHANGE_READY_TO_COMMIT_TIMEOUT()
3276 < cf_getms()) {
3277 // Its been a while since ready to commit has been sent to the
3278 // principal, retransmit it so that the principal gets it this time and
3279 // supplies a commit message.
3280 exchange_ready_to_commit_msg_send();
3281 }
3282 EXCHANGE_UNLOCK();
3283}
3284
3285/**
3286 * Event processing in the ready_to_commit state.
3287 */
3288static void
3289exchange_ready_to_commit_event_handle(as_exchange_event* event)
3290{
3291 switch (event->type) {
3292 case AS_EXCHANGE_EVENT_CLUSTER_CHANGE:
3293 exchange_clustering_event_handle(event);
3294 break;
3295 case AS_EXCHANGE_EVENT_MSG:
3296 exchange_ready_to_commit_msg_event_handle(event);
3297 break;
3298 case AS_EXCHANGE_EVENT_TIMER:
3299 exchange_ready_to_commit_timer_event_handle(event);
3300 break;
3301 }
3302}
3303
3304/*
3305 * ----------------------------------------------------------------------------
3306 * Exchange core subsystem
3307 * ----------------------------------------------------------------------------
3308 */
3309
3310/**
3311 * Dispatch an exchange event inline to the relevant state handler.
3312 */
3313static void
3314exchange_event_handle(as_exchange_event* event)
3315{
3316 EXCHANGE_LOCK();
3317
3318 switch (g_exchange.state) {
3319 case AS_EXCHANGE_STATE_REST:
3320 exchange_rest_event_handle(event);
3321 break;
3322 case AS_EXCHANGE_STATE_EXCHANGING:
3323 exchange_exchanging_event_handle(event);
3324 break;
3325 case AS_EXCHANGE_STATE_READY_TO_COMMIT:
3326 exchange_ready_to_commit_event_handle(event);
3327 break;
3328 case AS_EXCHANGE_STATE_ORPHANED:
3329 exchange_orphan_event_handle(event);
3330 break;
3331 }
3332
3333 EXCHANGE_UNLOCK();
3334}
3335
3336/**
3337 * Exchange timer event generator thread, to help with retries and retransmits
3338 * across all states.
3339 */
3340static void*
3341exchange_timer_thr(void* arg)
3342{
3343 as_exchange_event timer_event;
3344 memset(&timer_event, 0, sizeof(timer_event));
3345 timer_event.type = AS_EXCHANGE_EVENT_TIMER;
3346
3347 while (EXCHANGE_IS_RUNNING()) {
3348 // Wait for a while and retry.
3349 usleep(EXCHANGE_TIMER_TICK_INTERVAL() * 1000);
3350 exchange_event_handle(&timer_event);
3351 }
3352 return NULL;
3353}
3354
3355/**
3356 * Handle incoming messages from fabric.
3357 */
3358static int
3359exchange_fabric_msg_listener(cf_node source, msg* msg, void* udata)
3360{
3361 if (!EXCHANGE_IS_RUNNING()) {
3362 // Ignore this message.
3363 DEBUG("exchange stopped - ignoring message from %"PRIx64, source);
3364 goto Exit;
3365 }
3366
3367 as_exchange_event msg_event;
3368 memset(&msg_event, 0, sizeof(msg_event));
3369 msg_event.type = AS_EXCHANGE_EVENT_MSG;
3370 msg_event.msg = msg;
3371 msg_event.msg_source = source;
3372
3373 exchange_event_handle(&msg_event);
3374Exit:
3375 as_fabric_msg_put(msg);
3376 return 0;
3377}
3378
3379/**
3380 * Listener for cluster change events from clustering layer.
3381 */
3382void
3383exchange_clustering_event_listener(as_clustering_event* event)
3384{
3385 if (!EXCHANGE_IS_RUNNING()) {
3386 // Ignore this message.
3387 DEBUG("exchange stopped - ignoring cluster change event");
3388 return;
3389 }
3390
3391 as_exchange_event clustering_event;
3392 memset(&clustering_event, 0, sizeof(clustering_event));
3393 clustering_event.type = AS_EXCHANGE_EVENT_CLUSTER_CHANGE;
3394 clustering_event.clustering_event = event;
3395
3396 // Dispatch the event.
3397 exchange_event_handle(&clustering_event);
3398}
3399
3400/**
3401 * Initialize the template to be used for exchange messages.
3402 */
3403static void
3404exchange_msg_init()
3405{
3406 // Register fabric exchange msg type with no processing function.
3407 as_fabric_register_msg_fn(M_TYPE_EXCHANGE, exchange_msg_template,
3408 sizeof(exchange_msg_template), AS_EXCHANGE_MSG_SCRATCH_SIZE,
3409 exchange_fabric_msg_listener, NULL);
3410}
3411
3412/**
3413 * Initialize exchange subsystem.
3414 */
3415static void
3416exchange_init()
3417{
3418 if (EXCHANGE_IS_INITIALIZED()) {
3419 return;
3420 }
3421
3422 EXCHANGE_LOCK();
3423
3424 memset(&g_exchange, 0, sizeof(g_exchange));
3425
3426 // Start in the orphaned state.
3427 g_exchange.state = AS_EXCHANGE_STATE_ORPHANED;
3428 g_exchange.orphan_state_start_time = cf_getms();
3429 g_exchange.orphan_state_are_transactions_blocked = true;
3430
3431 // Initialize the adjacencies.
3432 g_exchange.nodeid_to_node_state = cf_shash_create(cf_nodeid_shash_fn,
3433 sizeof(cf_node), sizeof(as_exchange_node_state),
3434 AS_EXCHANGE_CLUSTER_MAX_SIZE_SOFT, 0);
3435
3436 cf_vector_init(&g_exchange.succession_list, sizeof(cf_node),
3437 AS_EXCHANGE_CLUSTER_MAX_SIZE_SOFT, VECTOR_FLAG_INITZERO);
3438
3439 EXCHANGE_COMMITTED_CLUSTER_WLOCK();
3440 cf_vector_init(&g_exchange.committed_succession_list, sizeof(cf_node),
3441 AS_EXCHANGE_CLUSTER_MAX_SIZE_SOFT, VECTOR_FLAG_INITZERO);
3442 EXCHANGE_COMMITTED_CLUSTER_UNLOCK();
3443
3444 // Initialize exchange fabric messaging.
3445 exchange_msg_init();
3446
3447 // Initialize self exchange data dynamic buffers.
3448 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
3449 cf_dyn_buf_init_heap(&g_exchange.self_data_dyn_buf[ns_ix],
3450 AS_EXCHANGE_SELF_DYN_BUF_SIZE());
3451 }
3452
3453 // Initialize external event publishing.
3454 exchange_external_event_publisher_init();
3455
3456 // Get partition versions from storage.
3457 as_partition_balance_init();
3458
3459 DEBUG("exchange module initialized");
3460
3461 EXCHANGE_UNLOCK();
3462}
3463
3464/**
3465 * Stop exchange subsystem.
3466 */
3467static void
3468exchange_stop()
3469{
3470 if (!EXCHANGE_IS_RUNNING()) {
3471 WARNING("exchange is already stopped");
3472 return;
3473 }
3474
3475 // Unguarded state, but this should be ok.
3476 g_exchange.sys_state = AS_EXCHANGE_SYS_STATE_SHUTTING_DOWN;
3477
3478 cf_thread_join(g_exchange.timer_tid);
3479
3480 EXCHANGE_LOCK();
3481
3482 g_exchange.sys_state = AS_EXCHANGE_SYS_STATE_STOPPED;
3483
3484 DEBUG("exchange module stopped");
3485
3486 EXCHANGE_UNLOCK();
3487
3488 external_event_publisher_stop();
3489}
3490
3491/**
3492 * Start the exchange subsystem.
3493 */
3494static void
3495exchange_start()
3496{
3497 EXCHANGE_LOCK();
3498
3499 if (EXCHANGE_IS_RUNNING()) {
3500 // Shutdown the exchange subsystem.
3501 exchange_stop();
3502 }
3503
3504 g_exchange.sys_state = AS_EXCHANGE_SYS_STATE_RUNNING;
3505
3506 g_exchange.timer_tid = cf_thread_create_joinable(exchange_timer_thr,
3507 (void*)&g_exchange);
3508
3509 DEBUG("exchange module started");
3510
3511 EXCHANGE_UNLOCK();
3512
3513 exchange_external_event_publisher_start();
3514}
3515
3516/*
3517 * ----------------------------------------------------------------------------
3518 * Public API
3519 * ----------------------------------------------------------------------------
3520 */
3521/**
3522 * Initialize exchange subsystem.
3523 */
3524void
3525as_exchange_init()
3526{
3527 exchange_init();
3528}
3529
3530/**
3531 * Start exchange subsystem.
3532 */
3533void
3534as_exchange_start()
3535{
3536 exchange_start();
3537}
3538
3539/**
3540 * Stop exchange subsystem.
3541 */
3542void
3543as_exchange_stop()
3544{
3545}
3546
3547/**
3548 * Register to receive cluster-changed events.
3549 * TODO - may replace with simple static list someday.
3550 */
3551void
3552as_exchange_register_listener(as_exchange_cluster_changed_cb cb, void* udata)
3553{
3554 exchange_external_event_listener_register(cb, udata);
3555}
3556
3557/**
3558 * Dump exchange state to log.
3559 */
3560void
3561as_exchange_dump(bool verbose)
3562{
3563 exchange_dump(CF_INFO, verbose);
3564}
3565
3566/**
3567 * Member-access method.
3568 */
3569uint64_t
3570as_exchange_cluster_key()
3571{
3572 return (uint64_t)g_exchange.committed_cluster_key;
3573}
3574
3575/**
3576 * Member-access method.
3577 */
3578uint32_t
3579as_exchange_cluster_size()
3580{
3581 return g_exchange.committed_cluster_size;
3582}
3583
3584/**
3585 * Copy over the committed succession list.
3586 * Ensure the input vector has enough capacity.
3587 */
3588void
3589as_exchange_succession(cf_vector* succession)
3590{
3591 EXCHANGE_COMMITTED_CLUSTER_RLOCK();
3592 vector_copy(succession, &g_exchange.committed_succession_list);
3593 EXCHANGE_COMMITTED_CLUSTER_UNLOCK();
3594}
3595
3596/**
3597 * Return the committed succession list. For internal use within the scope
3598 * exchange_data_commit function call.
3599 */
3600cf_node*
3601as_exchange_succession_unsafe()
3602{
3603 return vector_to_array(&g_exchange.committed_succession_list);
3604}
3605
3606/**
3607 * Return the committed succession list as a string in a dyn-buf.
3608 */
3609void
3610as_exchange_info_get_succession(cf_dyn_buf* db)
3611{
3612 EXCHANGE_COMMITTED_CLUSTER_RLOCK();
3613
3614 cf_node* nodes = vector_to_array(&g_exchange.committed_succession_list);
3615
3616 for (uint32_t i = 0; i < g_exchange.committed_cluster_size; i++) {
3617 cf_dyn_buf_append_uint64_x(db, nodes[i]);
3618 cf_dyn_buf_append_char(db, ',');
3619 }
3620
3621 if (g_exchange.committed_cluster_size != 0) {
3622 cf_dyn_buf_chomp(db);
3623 }
3624
3625 // Always succeeds.
3626 cf_dyn_buf_append_string(db, "\nok");
3627
3628 EXCHANGE_COMMITTED_CLUSTER_UNLOCK();
3629}
3630
3631/**
3632 * Member-access method.
3633 */
3634cf_node
3635as_exchange_principal()
3636{
3637 return g_exchange.committed_principal;
3638}
3639
3640/**
3641 * Used by exchange listeners during upgrades for compatibility purposes.
3642 */
3643uint32_t*
3644as_exchange_compatibility_ids(void)
3645{
3646 return (uint32_t*)g_exchange.compatibility_ids;
3647}
3648
3649/**
3650 * Used by exchange listeners during upgrades for compatibility purposes.
3651 */
3652uint32_t
3653as_exchange_min_compatibility_id(void)
3654{
3655 return g_exchange.min_compatibility_id;
3656}
3657
3658/**
3659 * Exchange cluster state output for info calls.
3660 */
3661void as_exchange_cluster_info(cf_dyn_buf* db) {
3662 EXCHANGE_COMMITTED_CLUSTER_RLOCK();
3663 info_append_uint32(db, "cluster_size", g_exchange.committed_cluster_size);
3664 info_append_uint64_x(db, "cluster_key", g_exchange.committed_cluster_key);
3665 info_append_uint64(db, "cluster_generation", g_exchange.committed_cluster_generation);
3666 info_append_uint64_x(db, "cluster_principal", g_exchange.committed_principal);
3667 EXCHANGE_COMMITTED_CLUSTER_UNLOCK();
3668}
3669
3670/**
3671 * Lock before setting or getting exchanged info from non-exchange thread.
3672 */
3673void
3674as_exchange_info_lock()
3675{
3676 pthread_mutex_lock(&g_exchanged_info_lock);
3677}
3678
3679/**
3680 * Unlock after setting or getting exchanged info from non-exchange thread.
3681 */
3682void
3683as_exchange_info_unlock()
3684{
3685 pthread_mutex_unlock(&g_exchanged_info_lock);
3686}
3687