1/*
2 * clustering.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#include "fabric/clustering.h"
24
25#include <errno.h>
26#include <math.h>
27#include <stdio.h>
28#include <unistd.h>
29#include <sys/param.h> // For MAX() and MIN().
30
31#include "citrusleaf/alloc.h"
32#include "citrusleaf/cf_clock.h"
33#include "citrusleaf/cf_random.h"
34
35#include "cf_thread.h"
36#include "fault.h"
37#include "msg.h"
38#include "node.h"
39#include "shash.h"
40
41#include "base/cfg.h"
42#include "fabric/fabric.h"
43#include "fabric/hlc.h"
44
45/*
46 * Overview
47 * ========
48 * Clustering v5 implementation based on the design at
49 * https://aerospike.atlassian.net/wiki/pages/viewpage.action?spaceKey=DEV&title=Central+Wiki%3A++Clustering+V5
50 *
51 * Public and private view of the cluster
52 * =======================================
53 * This clustering algorithm introduces an orphan state, in which this node is
54 * not part of a cluster, but is looking to form/join a cluster. During this
55 * transitionary phase, the public view of the cluster the tuple, <cluster_key,
56 * succession_list), does not change from the last view. However the internal
57 * view, which is published along with the heartbeat messages, is set to <0,
58 * []>.
59 *
60 * This ensures clients continue to function, (maybe with errors), during the
61 * transition from orphan to part of a cluster state. This is in line with the
62 * clustering v4 and prior behaviour.
63 *
64 * TODO: (revise)
65 *
66 * Deviations from paxos
67 * =====================
68 *
69 * Accepted value
70 * ---------------
71 *
72 * Accepted value is not send along with accept and accepted message. The latest
73 * accepted value overwrites the previous value at a node. In paxos if a node
74 * has already accepted a value, it is send back to the proposer who should use
75 * the value with highest proposal id as the final value. The proposer generates
76 * the final consensus value as the succession list with the nodes that have
77 * both returned promise and accepted replies.
78 *
79 * This is not safe in terms of achieveing a single paxos value, however it is
80 * safe in that nodes courted by other principals will get filtered out during
81 * paxos and not require additional paxos rounds.
82 *
83 * It is still possible that the final consensus succession list might has a few
84 * nodes moving out owing to a neighboring principal. However the faulty node
85 * check in the next quantum interval will fix this.
86 *
87 * Quorum
88 * ------
89 * The prepare phase uses a majority quorum for the promise messages, to speed
90 * through the paxos round. However the accept phase uses a complete / full
91 * quorum for accepted messages. This helps with ensuring that when a node
92 * generartes a cluster change event all cluster member have applied the current
93 * cluster membership.
94 *
95 * Design
96 * ======
97 * The clustering sub-system with rest of Aerospike via input event notification
98 * (primarily heartbeat events) and output events notifications (primary cluster
99 * change notifications).
100 *
101 * The subsystem is driven by internal events (that also encapsulate external
102 * input event notifications) like timer, quantum interval start, adjaceny
103 * changed, message received, etc.
104 *
105 * The clustering-v5 subsystem is further organized as the following sub-modules
106 * each of which reacts to the above mentioned events based on individual state
107 * transition diagrams.
108 *
109 * 1. Timer
110 * 2. Quantum interval generator
111 * 3. Paxos proposer
112 * 4. Paxos acceptor
113 * 5. Register
114 * 6. External event publisher
115 * 7. Internal event dispatcher
116 * 8. Clustering main
117 *
118 * The sub modules also interact with each other via inline internal event
119 * dispatch and handling.
120 *
121 * Timer
122 * -----
123 * Generates timer events that serve as the internal tick/clock for the
124 * clustering-v5 sub system. Other sub-modules use the timer events to drive
125 * actions to be performed at fixed intervals, for e.g. message retransmits.
126 *
127 * Quantum interval generator
128 * --------------------------
129 * Generates quantum interval start events, at which cluster change decision are
130 * taken.
131 *
132 * Paxos proposer
133 * --------------
134 * The paxos proposer proposes a cluster change. The node may or may not be the
135 * eventual principal for the cluster.
136 *
137 * Paxos acceptor
138 * --------------
139 * Participates in voting for a proposal. A paxos proposer is also necessarily
140 * an accetor in this design.
141 *
142 * Register
143 * --------
144 * Holds current cluster membership and cluster key. It is responsible for
145 * ensuring all cluster members have their registers in sync before publishing
146 * an external cluster change event.
147 *
148 * External event publisher
149 * ------------------------
150 * Generate and publishes external events or cluster changes. Runs as a separate
151 * thread to prevent interference and potential deadlocks with the clustering
152 * subsystem.
153 *
154 * Internal event dispatcher
155 * -------------------------
156 * Dispatches internal events to current function based in the event type and
157 * current state.
158 *
159 * Clustering main
160 * ---------------
161 * Monitors the cluster and triggers cluster changes.
162 *
163 * State transitions
164 * =================
165 * TODO: diagrams for each sub-module
166 *
167 * Message send rules
168 * ==================
169 * Message send should preferably be outside the main clustering lock and should
170 * not be followed by any state change in the same function. This is because
171 * fabric relays messages to self inline in the send call itself which can lead
172 * to corruption if the message handler involves a state change as well or can
173 * result in the message handler seeing inconsistent partially updated state.
174 */
175
176/*
177 * ----------------------------------------------------------------------------
178 * Constants
179 * ----------------------------------------------------------------------------
180 */
181
182/**
183 * A soft limit for the maximum cluster size. Meant to be optimize hash and list
184 * data structures and not as a limit on the number of nodes.
185 */
186#define AS_CLUSTERING_CLUSTER_MAX_SIZE_SOFT 200
187
188/**
189 * Timer event generation interval.
190 */
191#define CLUSTERING_TIMER_TICK_INTERVAL 75
192
193/**
194 * Maximum time paxos round would take for completion. 3 RTTs paxos message
195 * exchanges and 1 RTT as a buffer.
196 */
197#define PAXOS_COMPLETION_TIME_MAX (4 * network_rtt_max())
198
199/**
200 * Maximum quantum interval duration, should be at least two heartbeat
201 * intervals, to ensure there is at least one exchange of clustering information
202 * over heartbeats.
203 */
204#define QUANTUM_INTERVAL_MAX MAX(5000, 2 * as_hb_tx_interval_get())
205
206/**
207 * Block size for allocating node plugin data. Ensure the allocation is in
208 * multiples of 128 bytes, allowing expansion to 16 nodes without reallocating.
209 */
210#define HB_PLUGIN_DATA_BLOCK_SIZE 128
211
212/**
213 * Scratch size for clustering messages.
214 *
215 * TODO: Compute this properly.
216 */
217#define AS_CLUSTERING_MSG_SCRATCH_SIZE 1024
218
219/**
220 * Majority value for preferred principal to be selected for move. Use tow
221 * thirds as the majority value.
222 */
223#define AS_CLUSTERING_PREFERRRED_PRINCIPAL_MAJORITY (2 / 3)
224
225/*
226 * ----------------------------------------------------------------------------
227 * Paxos data structures
228 * ----------------------------------------------------------------------------
229 */
230
231/**
232 * Paxos sequence number. We will use the hybrid logical clock timestamp as
233 * sequence numbers, to ensure node restarts do not reset the sequence number
234 * back to zero and sequence numbers are monotoniocally increasing. A sequence
235 * number value of zero is invalid.
236 */
237typedef as_hlc_timestamp as_paxos_sequence_number;
238
239/**
240 * Paxos proposal identifier.
241 * Note: The nodeid can be skipped when sending the proposal id over the wire
242 * and can be inferred from the source duirng paxos message exchanges.
243 */
244typedef struct as_paxos_proposal_id_s
245{
246 /**
247 * The sequence number.
248 */
249 as_paxos_sequence_number sequence_number;
250
251 /**
252 * The proposing node's nodeid to break ties.
253 */
254 cf_node src_nodeid;
255} as_paxos_proposal_id;
256
257/**
258 * The proposed cluster membership.
259 */
260typedef struct as_paxos_proposed_value_s
261{
262 /**
263 * The cluster key.
264 */
265 as_cluster_key cluster_key;
266
267 /**
268 * The succession list.
269 */
270 cf_vector succession_list;
271} as_paxos_proposed_value;
272
273/**
274 * Paxos acceptor state.
275 */
276typedef enum
277{
278 /**
279 * Acceptor is idel with no active paxos round.
280 */
281 AS_PAXOS_ACCEPTOR_STATE_IDLE,
282
283 /**
284 * Acceptor has received and acked a promise message.
285 */
286 AS_PAXOS_ACCEPTOR_STATE_PROMISED,
287
288 /**
289 * Acceptor has received and accepted an accept message from a proposer.
290 */
291 AS_PAXOS_ACCEPTOR_STATE_ACCEPTED
292} as_paxos_acceptor_state;
293
294/**
295 * Data tracked by the node in the role of a paxos acceptor.
296 * All nodes are paxos acceptors.
297 */
298typedef struct as_paxos_acceptor_s
299{
300 /**
301 * The paxos acceptor state.
302 */
303 as_paxos_acceptor_state state;
304
305 /**
306 * Monotonic timestamp when the first message for current proposal was
307 * received from the proposer.
308 */
309 cf_clock acceptor_round_start;
310
311 /**
312 * Monotonic timestamp when the promise message was sent.
313 */
314 cf_clock promise_send_time;
315
316 /**
317 * Monotonic timestamp when the promise message was sent.
318 */
319 cf_clock accepted_send_time;
320
321 /**
322 * Id of the last proposal, promised or accepted by this node.
323 */
324 as_paxos_proposal_id last_proposal_received_id;
325} as_paxos_acceptor;
326
327/**
328 * State of a paxos proposer.
329 */
330typedef enum as_paxos_proposer_state_e
331{
332 /**
333 * Paxos proposer is idle. No pending paxos rounds.
334 */
335 AS_PAXOS_PROPOSER_STATE_IDLE,
336
337 /**
338 * Paxos proposer sent out a prepare message.
339 */
340 AS_PAXOS_PROPOSER_STATE_PREPARE_SENT,
341
342 /**
343 * Paxos proposer has sent out an accept message.
344 */
345 AS_PAXOS_PROPOSER_STATE_ACCEPT_SENT
346} as_paxos_proposer_state;
347
348/**
349 * Data tracked by the node in the role of a paxos proposer. The proposer node
350 * may or may not be the current or eventual principal.
351 */
352typedef struct as_paxos_proposer_s
353{
354 /**
355 * The state of the proposer.
356 */
357 as_paxos_proposer_state state;
358
359 /**
360 * The sequence number / id for the last proposed paxos value.
361 */
362 as_paxos_sequence_number sequence_number;
363
364 /**
365 * The proposed cluster value.
366 */
367 as_paxos_proposed_value proposed_value;
368
369 /**
370 * The time current paxos round was started.
371 */
372 cf_clock paxos_round_start_time;
373
374 /**
375 * The time current proposal's prepare message was sent.
376 */
377 cf_clock prepare_send_time;
378
379 /**
380 * The time current proposal's accept message was sent.
381 */
382 cf_clock accept_send_time;
383
384 /**
385 * The time current proposal's learn message was sent.
386 */
387 cf_clock learn_send_time;
388
389 /**
390 * Indicates if learn message needs retransmit.
391 */
392 bool learn_retransmit_needed;
393
394 /**
395 * The set of acceptor nodes including self.
396 */
397 cf_vector acceptors;
398
399 /**
400 * Set of nodeids that send out a promise response to the current prepare
401 * message.
402 */
403 cf_vector promises_received;
404
405 /**
406 * Set of nodeids that send out an accepted response to the current accept
407 * message.
408 */
409 cf_vector accepted_received;
410} as_paxos_proposer;
411
412/**
413 * Result of paxos round start call.
414 */
415typedef enum as_paxos_start_result_e
416{
417 /**
418 * Paxos round started successfully.
419 */
420 AS_PAXOS_RESULT_STARTED,
421
422 /**
423 * cluster size is less than minimum required cluster size.
424 */
425 AS_PAXOS_RESULT_CLUSTER_TOO_SMALL,
426
427 /**
428 * Paxos round already in progress. Paxos not started.
429 */
430 AS_PAXOS_RESULT_ROUND_RUNNING
431} as_paxos_start_result;
432
433/**
434 * Node clustering status.
435 */
436typedef enum
437{
438 /**
439 * Peer node is orphaned.
440 */
441 AS_NODE_ORPHAN,
442
443 /**
444 * Peer node has a cluster assigned.
445 */
446 AS_NODE_CLUSTER_ASSIGNED,
447
448 /**
449 * Peer node status is unknown.
450 */
451 AS_NODE_UNKNOWN
452} as_clustering_peer_node_state;
453
454/*
455 * ----------------------------------------------------------------------------
456 * Clustering data structures
457 * ----------------------------------------------------------------------------
458 */
459
460/**
461 * Clustering message types.
462 */
463typedef enum
464{
465 /*
466 * ---- Clustering management messages ----
467 */
468 AS_CLUSTERING_MSG_TYPE_JOIN_REQUEST,
469 AS_CLUSTERING_MSG_TYPE_JOIN_REJECT,
470 AS_CLUSTERING_MSG_TYPE_MERGE_MOVE,
471 AS_CLUSTERING_MSG_TYPE_CLUSTER_CHANGE_APPLIED,
472
473 /*
474 * ---- Paxos messages ----
475 */
476 AS_CLUSTERING_MSG_TYPE_PAXOS_PREPARE,
477 AS_CLUSTERING_MSG_TYPE_PAXOS_PROMISE,
478 AS_CLUSTERING_MSG_TYPE_PAXOS_PREPARE_NACK,
479 AS_CLUSTERING_MSG_TYPE_PAXOS_ACCEPT,
480 AS_CLUSTERING_MSG_TYPE_PAXOS_ACCEPTED,
481 AS_CLUSTERING_MSG_TYPE_PAXOS_ACCEPT_NACK,
482 AS_CLUSTERING_MSG_TYPE_PAXOS_LEARN,
483} as_clustering_msg_type;
484
485/**
486 * The fields in the clustering message.
487 */
488typedef enum
489{
490 /**
491 * Clustering message identifier.
492 */
493 AS_CLUSTERING_MSG_ID,
494
495 /**
496 * Clustering message type.
497 */
498 AS_CLUSTERING_MSG_TYPE,
499
500 /**
501 * The source node send timestamp.
502 */
503 AS_CLUSTERING_MSG_HLC_TIMESTAMP,
504
505 /**
506 * The paxos sequence number. Not all messages will have this.
507 */
508 AS_CLUSTERING_MSG_SEQUENCE_NUMBER,
509
510 /**
511 * The proposed cluster key. Only part of the paxos accept message.
512 */
513 AS_CLUSTERING_MSG_CLUSTER_KEY,
514
515 /**
516 * The proposed succession list. Only part of the paxos accept message.
517 */
518 AS_CLUSTERING_MSG_SUCCESSION_LIST,
519
520 /**
521 * The proposed principal relevant only to cluster move commands, which will
522 * merge two well formed paxos clusters.
523 */
524 AS_CLUSTERING_MSG_PROPOSED_PRINCIPAL,
525
526 /**
527 * Sentinel value to keep track of the number of message fields.
528 */
529 AS_CLUSTERING_MGS_SENTINEL
530} as_clustering_msg_field;
531
532/**
533 * Internal clustering event type.
534 */
535typedef enum
536{
537 /**
538 * Timer event.
539 */
540 AS_CLUSTERING_INTERNAL_EVENT_TIMER,
541
542 /**
543 * Incoming message event.
544 */
545 AS_CLUSTERING_INTERNAL_EVENT_MSG,
546
547 /**
548 * A join request was accepted.
549 */
550 AS_CLUSTERING_INTERNAL_EVENT_JOIN_REQUEST_ACCEPTED,
551
552 /**
553 * Indicates the start of a quantum interval.
554 */
555 AS_CLUSTERING_INTERNAL_EVENT_QUANTUM_INTERVAL_START,
556
557 /**
558 * Indicates that self node's cluster membership changed.
559 */
560 AS_CLUSTERING_INTERNAL_EVENT_REGISTER_CLUSTER_CHANGED,
561
562 /**
563 * Indicates that self node's cluster membership has been synced across all
564 * cluster members.
565 */
566 AS_CLUSTERING_INTERNAL_EVENT_REGISTER_CLUSTER_SYNCED,
567
568 /**
569 * Indicates that self node has been marked as an orphan.
570 */
571 AS_CLUSTERING_INTERNAL_EVENT_REGISTER_ORPHANED,
572
573 /**
574 * Indicates an incoming heartbeat event.
575 */
576 AS_CLUSTERING_INTERNAL_EVENT_HB,
577
578 /**
579 * Indicates that plugin data for a node has changed.
580 */
581 AS_CLUSTERING_INTERNAL_EVENT_HB_PLUGIN_DATA_CHANGED,
582
583 /**
584 * The paxos round being accepted succeeded and the proposed value should be
585 * committed.
586 * This implies that all the proposed cluster members have all agreed on the
587 * proposed cluster key and the proposed cluster membership.
588 */
589 AS_CLUSTERING_INTERNAL_EVENT_PAXOS_ACCEPTOR_SUCCESS,
590
591 /**
592 * The last paxos round being accepted failed.
593 */
594 AS_CLUSTERING_INTERNAL_EVENT_PAXOS_ACCEPTOR_FAIL,
595
596 /**
597 * The paxos round proposed by this node.
598 */
599 AS_CLUSTERING_INTERNAL_EVENT_PAXOS_PROPOSER_SUCCESS,
600
601 /**
602 * The last paxos round proposed failed.
603 */
604 AS_CLUSTERING_INTERNAL_EVENT_PAXOS_PROPOSER_FAIL,
605} as_clustering_internal_event_type;
606
607/**
608 * An event used internally by the clustering subsystem.
609 */
610typedef struct as_clustering_internal_event_s
611{
612 /**
613 * The event type.
614 */
615 as_clustering_internal_event_type type;
616
617 /**
618 * The event qualifier.
619 */
620 as_clustering_event_qualifier qualifier;
621
622 /*
623 * ----- Quantum interval start event related fields
624 */
625 /**
626 * Indicates if this quantum interval start can be skipped by the event
627 * handler.
628 */
629 bool quantum_interval_is_skippable;
630
631 /*
632 * ----- Message event related fields.
633 */
634 /**
635 * The source node id.
636 */
637 cf_node msg_src_nodeid;
638
639 /**
640 * Incoming message type.
641 */
642 as_clustering_msg_type msg_type;
643
644 /**
645 * The hlc timestamp for message receipt.
646 */
647 as_hlc_msg_timestamp msg_hlc_ts;
648
649 /**
650 * Local monotonic received timestamp.
651 */
652 cf_clock msg_recvd_ts;
653
654 /**
655 * The received message.
656 */
657 msg* msg;
658
659 /*
660 * ----- HB event related fields.
661 */
662 /**
663 * Number of heartbeat events.
664 */
665 int hb_n_events;
666
667 /**
668 * Heartbeat events.
669 */
670 as_hb_event_node* hb_events;
671
672 /*
673 * ----- HB plugin data changed event related fields.
674 */
675 /**
676 * Node id of the node whose plugin data has changed.
677 */
678 cf_node plugin_data_changed_nodeid;
679
680 /**
681 * Node's plugin data.
682 */
683 as_hb_plugin_node_data* plugin_data;
684
685 /**
686 * The hlc timestamp for message receipt.
687 */
688 as_hlc_msg_timestamp plugin_data_changed_hlc_ts;
689
690 /**
691 * Local monotonic received timestamp.
692 */
693 cf_clock plugin_data_changed_ts;
694
695 /*
696 * ----- Join request handled related fields.
697 */
698 cf_node join_request_source_nodeid;
699
700 /*
701 * ----- Paxos success related fields.
702 */
703 /**
704 * New succession list.
705 */
706 cf_vector *new_succession_list;
707
708 /**
709 * New cluster key.
710 */
711 as_cluster_key new_cluster_key;
712
713 /**
714 * New paxos sequence number.
715 */
716 as_paxos_sequence_number new_sequence_number;
717} as_clustering_internal_event;
718
719/**
720 * The clustering timer state.
721 */
722typedef struct as_clustering_timer_s
723{
724 /**
725 * The timer thread id.
726 */
727 pthread_t timer_tid;
728} as_clustering_timer;
729
730/**
731 * Clustering subsystem state.
732 */
733typedef enum
734{
735 AS_CLUSTERING_SYS_STATE_UNINITIALIZED,
736 AS_CLUSTERING_SYS_STATE_RUNNING,
737 AS_CLUSTERING_SYS_STATE_SHUTTING_DOWN,
738 AS_CLUSTERING_SYS_STATE_STOPPED
739} as_clustering_sys_state;
740
741/**
742 * Type of quantum interval fault. Ensure the vtable in quantum iterval table is
743 * updated for each type.
744 */
745typedef enum as_clustering_quantum_fault_type_e
746{
747 /**
748 * A new node arrived.
749 */
750 QUANTUM_FAULT_NODE_ARRIVED,
751
752 /**
753 * A node not our principal departed from the cluster.
754 */
755 QUANTUM_FAULT_NODE_DEPARTED,
756
757 /**
758 * We are in a cluster and out principal departed.
759 */
760 QUANTUM_FAULT_PRINCIPAL_DEPARTED,
761
762 /**
763 * A member node's adjacency list has changed.
764 */
765 QUANTUM_FAULT_PEER_ADJACENCY_CHANGED,
766
767 /**
768 * Join request accepted.
769 */
770 QUANTUM_FAULT_JOIN_ACCEPTED,
771
772 /**
773 * We have seen a principal who might send us a merge request.
774 */
775 QUANTUM_FAULT_INBOUND_MERGE_CANDIDATE_SEEN,
776
777 /**
778 * A node in our cluster has been orphaned.
779 */
780 QUANTUM_FAULT_CLUSTER_MEMBER_ORPHANED,
781
782 /**
783 * Sentinel value. Should be the last in the enum.
784 */
785 QUANTUM_FAULT_TYPE_SENTINEL
786} as_clustering_quantum_fault_type;
787
788/**
789 * Fault information for for first fault event detected in a quantum interval.
790 */
791typedef struct as_clustering_quantum_fault_s
792{
793 /**
794 * First time the fault event was detected in current quantum based on
795 * monotonic clock. Should be initialized to zero at quantum start / end.
796 */
797 cf_clock event_ts;
798
799 /**
800 * Last time the fault event was detected in current quantum based on
801 * monotonic clock. Should be initialized to zero at quantum start / end.
802 */
803 cf_clock last_event_ts;
804} as_clustering_quantum_fault;
805
806/**
807 * Function to determine the minimum wait time after given fault happens.
808 */
809typedef uint32_t
810(as_clustering_quantum_fault_wait_fn)(as_clustering_quantum_fault* fault);
811
812/**
813 * Vtable for different types of faults.
814 */
815typedef struct as_clustering_quantum_fault_vtable_s
816{
817 /**
818 * String used to log this fault type.
819 */
820 char *fault_log_str;
821
822 /**
823 * Function providing the wait time for this fault type.
824 */
825 as_clustering_quantum_fault_wait_fn* wait_fn;
826} as_clustering_quantum_fault_vtable;
827
828/**
829 * Generates quantum intervals.
830 */
831typedef struct as_clustering_quantum_interval_generator_s
832{
833 /**
834 * Quantum interval fault vtable.
835 */
836 as_clustering_quantum_fault_vtable vtable[QUANTUM_FAULT_TYPE_SENTINEL];
837
838 /**
839 * Quantum interval faults.
840 */
841 as_clustering_quantum_fault fault[QUANTUM_FAULT_TYPE_SENTINEL];
842
843 /**
844 * Time quantum interval last started.
845 */
846 cf_clock last_quantum_start_time;
847
848 /**
849 * For quantum interval being skippable respect the last quantum interval
850 * since quantum_interval() will be affected by changes to hb config.
851 */
852 uint32_t last_quantum_interval;
853
854 /**
855 * Indicates if current quantum interval should be postponed.
856 */
857 bool is_interval_postponed;
858} as_clustering_quantum_interval_generator;
859
860/**
861 * State of the clustering register.
862 */
863typedef enum
864{
865 /**
866 * The register contents are in synced with all cluster members.
867 */
868 AS_CLUSTERING_REGISTER_STATE_SYNCED,
869
870 /**
871 * The register contents are being synced with other cluster members.
872 */
873 AS_CLUSTERING_REGISTER_STATE_SYNCING
874} as_clustering_register_state;
875
876/**
877 * Stores current cluster key and succession list and generates external events.
878 */
879typedef struct as_clustering_register_s
880{
881 /**
882 * The register state.
883 */
884 as_clustering_register_state state;
885
886 /**
887 * Current cluster key.
888 */
889 as_cluster_key cluster_key;
890
891 /**
892 * Current succession list.
893 */
894 cf_vector succession_list;
895
896 /**
897 * Indicates if this node has transitioned to orphan state after being in a
898 * valid cluster.
899 */
900 bool has_orphan_transitioned;
901
902 /**
903 * The sequence number for the current cluster.
904 */
905 as_paxos_sequence_number sequence_number;
906
907 /**
908 * Nodes pending sync.
909 */
910 cf_vector sync_pending;
911
912 /**
913 * Nodes that send a sync applied for an unexpected cluster. Store it in
914 * case this is an imminent cluster change we will see in the future. All
915 * the nodes in this vector have sent the same cluster key and the same
916 * succession list.
917 */
918 cf_vector ooo_change_applied_received;
919
920 /**
921 * Cluster key sent by nodes in ooo_change_applied_received vector.
922 */
923 as_cluster_key ooo_cluster_key;
924
925 /**
926 * Succession sent by nodes in ooo_change_applied_received vector.
927 */
928 cf_vector ooo_succession_list;
929
930 /**
931 * Timestamp of the first ooo change applied message.
932 */
933 as_hlc_timestamp ooo_hlc_timestamp;
934
935 /**
936 * The time cluster last changed.
937 */
938 as_hlc_timestamp cluster_modified_hlc_ts;
939
940 /**
941 * The monotonic clock time cluster last changed.
942 */
943 cf_clock cluster_modified_time;
944
945 /**
946 * The last time the register sync was checked in the syncing state.
947 */
948 cf_clock last_sync_check_time;
949} as_clustering_register;
950
951/**
952 * * Clustering state.
953 */
954typedef enum
955{
956 /**
957 * Self node is not part of a cluster.
958 */
959 AS_CLUSTERING_STATE_ORPHAN,
960
961 /**
962 * Self node is not part of a cluster.
963 */
964 AS_CLUSTERING_STATE_PRINCIPAL,
965
966 /**
967 * Self node is part of a cluster but not the principal.
968 */
969 AS_CLUSTERING_STATE_NON_PRINCIPAL
970} as_clustering_state;
971
972/**
973 * Clustering state maintained by this node.
974 */
975typedef struct as_clustering_s
976{
977
978 /**
979 * Clustering submodule state, indicates if the clustering sub system is
980 * running, stopped or initialized.
981 */
982 as_clustering_sys_state sys_state;
983
984 /**
985 * Simple view of whether or not the cluster is well-formed.
986 */
987 bool has_integrity;
988
989 /**
990 * Clustering relevant state, e.g. orphan, principal, non-principal.
991 */
992 as_clustering_state state;
993
994 /**
995 * The preferred principal is a node such that removing current principal
996 * and making said node new principal will lead to a larger cluster. This is
997 * updated in the non-principal state at each quantum interval and is sent
998 * out with each heartbeat pulse.
999 */
1000 cf_node preferred_principal;
1001
1002 /**
1003 * Pending join requests.
1004 */
1005 cf_vector pending_join_requests;
1006
1007 /**
1008 * The monotonic clock time when this node entered orphan state.
1009 * Will be set to zero when the node is not an orphan.
1010 */
1011 cf_clock orphan_state_start_time;
1012
1013 /**
1014 * Time when the last move command was sent.
1015 */
1016 cf_clock move_cmd_issue_time;
1017
1018 /**
1019 * Hash from nodes whom join request was sent to the time the join request
1020 * was send . Used to prevent sending join request too quickly to the same
1021 * principal again and again.
1022 */
1023 cf_shash* join_request_blackout;
1024
1025 /**
1026 * The principal to which the last join request was sent.
1027 */
1028 cf_node last_join_request_principal;
1029
1030 /**
1031 * The time at which the last join request was sent, to track and timeout
1032 * join requests.
1033 */
1034 cf_clock last_join_request_sent_time;
1035
1036 /**
1037 * The time at which the last join request was retransmitted, to track and
1038 * retransmit join requests.
1039 */
1040 cf_clock last_join_request_retransmit_time;
1041} as_clustering;
1042
1043/**
1044 * Result of sending out a join request.
1045 */
1046typedef enum as_clustering_join_request_result_e
1047{
1048 /**
1049 *
1050 * Join request was sent out.
1051 */
1052 AS_CLUSTERING_JOIN_REQUEST_SENT,
1053
1054 /**
1055 *
1056 * Join request was attempted, but sending failed.
1057 */
1058 AS_CLUSTERING_JOIN_REQUEST_SEND_FAILED,
1059
1060 /**
1061 * Join request already pending. A new join request was not sent.
1062 */
1063 AS_CLUSTERING_JOIN_REQUEST_PENDING,
1064
1065 /**
1066 * No neighboring principals present to send the join request.
1067 */
1068 AS_CLUSTERING_JOIN_REQUEST_NO_PRINCIPALS
1069} as_clustering_join_request_result;
1070
1071/**
1072 * External event publisher state.
1073 */
1074typedef struct as_clustering_external_event_publisher_s
1075{
1076 /**
1077 * State of the external event publisher.
1078 */
1079 as_clustering_sys_state sys_state;
1080
1081 /**
1082 * Inidicates if there is an event to publish.
1083 */
1084 bool event_queued;
1085
1086 /**
1087 * The pending event to publish.
1088 */
1089 as_clustering_event to_publish;
1090
1091 /**
1092 * The static succession list published with the message.
1093 */
1094 cf_vector published_succession_list;
1095
1096 /**
1097 * Conditional variable to signal pending event to publish.
1098 */
1099 pthread_cond_t is_pending;
1100
1101 /**
1102 * Thread id of the publisher thread.
1103 */
1104 pthread_t event_publisher_tid;
1105
1106 /**
1107 * Mutex to protect the conditional variable.
1108 */
1109 pthread_mutex_t is_pending_mutex;
1110} as_clustering_external_event_publisher;
1111
1112/*
1113 * ----------------------------------------------------------------------------
1114 * Forward declarations
1115 * ----------------------------------------------------------------------------
1116 */
1117static void
1118internal_event_dispatch(as_clustering_internal_event* event);
1119static bool
1120clustering_is_our_principal(cf_node nodeid);
1121static bool
1122clustering_is_principal();
1123static bool
1124clustering_is_cluster_member(cf_node nodeid);
1125
1126/*
1127 * ----------------------------------------------------------------------------
1128 * Non-public hooks to exchange subsystem.
1129 * ----------------------------------------------------------------------------
1130 */
1131extern void
1132exchange_clustering_event_listener(as_clustering_event* event);
1133
1134/*
1135 * ----------------------------------------------------------------------------
1136 * Timer, timeout values and intervals
1137 *
1138 * All values should be multiples of timer tick interval.
1139 * ----------------------------------------------------------------------------
1140 */
1141
1142/**
1143 * Timer tick interval, which should be a GCD of all clustering intervals.
1144 */
1145static uint32_t
1146timer_tick_interval()
1147{
1148 return CLUSTERING_TIMER_TICK_INTERVAL;
1149}
1150
1151/**
1152 * Maximum network latency for the cluster.
1153 */
1154static uint32_t
1155network_latency_max()
1156{
1157 return g_config.fabric_latency_max_ms;
1158}
1159
1160/**
1161 * Maximum network rtt for the cluster.
1162 */
1163static uint32_t
1164network_rtt_max()
1165{
1166 return 2 * network_latency_max();
1167}
1168
1169/**
1170 * Quantum interval in milliseconds.
1171 */
1172static uint32_t
1173quantum_interval()
1174{
1175 uint32_t std_quantum_interval = MIN(QUANTUM_INTERVAL_MAX,
1176 as_hb_node_timeout_get()
1177 + 2 * (as_hb_tx_interval_get() + network_latency_max()));
1178
1179 // Ensure we give paxos enough time to complete.
1180 return MAX(PAXOS_COMPLETION_TIME_MAX, std_quantum_interval);
1181}
1182
1183/**
1184 * Maximum number of times quantum interval start can be skipped.
1185 */
1186static uint32_t
1187quantum_interval_skip_max()
1188{
1189 return 2;
1190}
1191
1192/**
1193 * Interval at which register sync is checked.
1194 */
1195static uint32_t
1196register_sync_check_interval()
1197{
1198 return MAX(network_rtt_max(), as_hb_tx_interval_get());
1199}
1200
1201/**
1202 * Timeout for a join request, should definitely be larger than a quantum
1203 * interval to prevent the requesting node from making new requests before the
1204 * current requested principal node can finish the paxos round.
1205 */
1206static uint32_t
1207join_request_timeout()
1208{
1209 // Allow for
1210 // - 1 quantum interval, where our request lands just after the potential
1211 // principal's quantum interval start.
1212 // - 0.5 quantum intervals to give time for a paxos round to finish
1213 // - (quantum_interval_skip_max -1) intervals if the principal had to skip
1214 // quantum intervals.
1215 return (uint32_t)(
1216 (1 + 0.5 + (quantum_interval_skip_max() - 1)) * quantum_interval());
1217}
1218
1219/**
1220 * Timeout for a retransmitting a join request.
1221 */
1222static uint32_t
1223join_request_retransmit_timeout()
1224{
1225 return (uint32_t)(MIN(as_hb_tx_interval_get() / 2, quantum_interval() / 2));
1226}
1227
1228/**
1229 * The interval at which a node checks to see if it should join a cluster.
1230 */
1231static uint32_t
1232join_cluster_check_interval()
1233{
1234 return timer_tick_interval();
1235}
1236
1237/**
1238 * Blackout period for join requests to a particular principal to prevent
1239 * bombarding it with join requests. Should be less than join_request_timeout().
1240 */
1241static uint32_t
1242join_request_blackout_interval()
1243{
1244 return MIN(join_request_timeout(),
1245 MIN(quantum_interval() / 2, 2 * as_hb_tx_interval_get()));
1246}
1247
1248/**
1249 * Blackout period after sending a move command, during which join requests will
1250 * be rejected.
1251 */
1252static uint32_t
1253join_request_move_reject_interval()
1254{
1255 // Wait for one quantum interval before accepting join requests after
1256 // sending a move command.
1257 return quantum_interval();
1258}
1259
1260/**
1261 * Maximum tolerable join request transmission delay in milliseconds. Join
1262 * requests delayed by more than this amount will not be accepted.
1263 */
1264static uint32_t
1265join_request_accept_delay_max()
1266{
1267 // Join request is considered stale / delayed if the (received hlc timestamp
1268 // - send hlc timestamp) > this value;
1269 return (2 * as_hb_tx_interval_get() + network_latency_max());
1270}
1271
1272/**
1273 * Timeout in milliseconds for a paxos proposal. Give a paxos round two thirds
1274 * of an interval to timeout.
1275 * A paxos round should definitely timeout before the next quantum interval, so
1276 * that it does not delay cluster convergence.
1277 */
1278static uint32_t
1279paxos_proposal_timeout()
1280{
1281 return MAX(quantum_interval() / 2, network_rtt_max());
1282}
1283
1284/**
1285 * Timeout in milliseconds after which a paxos message is retransmitted.
1286 */
1287static uint32_t
1288paxos_msg_timeout()
1289{
1290 return MAX(MIN(quantum_interval() / 4, 100), network_rtt_max());
1291}
1292
1293/**
1294 * Maximum amount of time a node will be in orphan state. After this timeout the
1295 * node will try forming a new cluster even if there are other adjacent
1296 * clusters/nodes visible.
1297 */
1298static uint32_t
1299clustering_orphan_timeout()
1300{
1301 return UINT_MAX;
1302}
1303
1304/*
1305 * ----------------------------------------------------------------------------
1306 * Stack allocation
1307 * ----------------------------------------------------------------------------
1308 */
1309
1310/**
1311 * Maximum memory size allocated on the call stack.
1312 */
1313#define STACK_ALLOC_LIMIT() (16 * 1024)
1314
1315/**
1316 * Allocate a buffer on stack if possible. Larger buffers are heap allocated to
1317 * prevent stack overflows.
1318 */
1319#define BUFFER_ALLOC_OR_DIE(size) \
1320(((size) > STACK_ALLOC_LIMIT()) ? cf_malloc(size) : alloca(size))
1321
1322/**
1323 * Free the buffer allocated by BUFFER_ALLOC
1324 */
1325#define BUFFER_FREE(buffer, size) \
1326if (((size) > STACK_ALLOC_LIMIT()) && buffer) {cf_free(buffer);}
1327
1328/*
1329 * ----------------------------------------------------------------------------
1330 * Logging
1331 * ----------------------------------------------------------------------------
1332 */
1333#define LOG_LENGTH_MAX() (800)
1334#define CRASH(format, ...) cf_crash(AS_CLUSTERING, format, ##__VA_ARGS__)
1335#define WARNING(format, ...) cf_warning(AS_CLUSTERING, format, ##__VA_ARGS__)
1336#define INFO(format, ...) cf_info(AS_CLUSTERING, format, ##__VA_ARGS__)
1337#define DEBUG(format, ...) cf_debug(AS_CLUSTERING, format, ##__VA_ARGS__)
1338#define DETAIL(format, ...) cf_detail(AS_CLUSTERING, format, ##__VA_ARGS__)
1339
1340#define ASSERT(expression, message, ...) \
1341if (!(expression)) {WARNING(message, ##__VA_ARGS__);}
1342
1343#define log_cf_node_array(message, nodes, node_count, severity) \
1344as_clustering_log_cf_node_array(severity, AS_CLUSTERING, message, \
1345 nodes, node_count)
1346#define log_cf_node_vector(message, nodes, severity) \
1347 as_clustering_log_cf_node_vector(severity, AS_CLUSTERING, message, \
1348 nodes)
1349
1350/*
1351 * ----------------------------------------------------------------------------
1352 * Vector functions
1353 * ----------------------------------------------------------------------------
1354 */
1355
1356/**
1357 * Clear / delete all entries in a vector.
1358 */
1359static void
1360vector_clear(cf_vector* vector)
1361{
1362 cf_vector_delete_range(vector, 0, cf_vector_size(vector));
1363}
1364
1365/**
1366 * Create temporary stack variables.
1367 */
1368#define TOKEN_PASTE(x, y) x##y
1369#define STACK_VAR(x, y) TOKEN_PASTE(x, y)
1370
1371/**
1372 * Initialize a lockless vector, initially sized to store cluster node number
1373 * of elements.
1374 */
1375#define vector_lockless_init(vectorp, value_type) \
1376({ \
1377 cf_vector_init(vectorp, sizeof(value_type), \
1378 AS_CLUSTERING_CLUSTER_MAX_SIZE_SOFT, VECTOR_FLAG_INITZERO); \
1379})
1380
1381/**
1382 * Create and initialize a lockless stack allocated vector to initially sized to
1383 * store cluster node number of elements.
1384 */
1385#define vector_stack_lockless_create(value_type) \
1386({ \
1387 cf_vector * STACK_VAR(vector, __LINE__) = (cf_vector*)alloca( \
1388 sizeof(cf_vector)); \
1389 size_t buffer_size = AS_CLUSTERING_CLUSTER_MAX_SIZE_SOFT \
1390 * sizeof(value_type); \
1391 void* STACK_VAR(buff, __LINE__) = alloca(buffer_size); cf_vector_init_smalloc( \
1392 STACK_VAR(vector, __LINE__), sizeof(value_type), \
1393 (uint8_t*)STACK_VAR(buff, __LINE__), buffer_size, \
1394 VECTOR_FLAG_INITZERO); \
1395 STACK_VAR(vector, __LINE__); \
1396})
1397
1398/**
1399 * Check two vector for equality. Two vector are euql if they have the same
1400 * number of elements and corresponding elements are equal. For now simple
1401 * memory compare is used to compare elements. Assumes the vectors are not
1402 * accessed by other threads during this operation.
1403 *
1404 * @param v1 the first vector to compare.
1405 * @param v2 the second vector to compare.
1406 * @return true if the vectors are true, false otherwise.
1407 */
1408static bool
1409vector_equals(cf_vector* v1, cf_vector* v2)
1410{
1411 int v1_count = cf_vector_size(v1);
1412 int v2_count = cf_vector_size(v2);
1413 int v1_elem_sz = VECTOR_ELEM_SZ(v1);
1414 int v2_elem_sz = VECTOR_ELEM_SZ(v2);
1415
1416 if (v1_count != v2_count || v1_elem_sz != v2_elem_sz) {
1417 return false;
1418 }
1419
1420 for (int i = 0; i < v1_count; i++) {
1421 // No null check required since we are iterating under a lock and within
1422 // vector bounds.
1423 void* v1_element = cf_vector_getp(v1, i);
1424 void* v2_element = cf_vector_getp(v2, i);
1425
1426 if (v1_element == v2_element) {
1427 // Same reference or both are NULL.
1428 continue;
1429 }
1430
1431 if (v1_element == NULL || v2_element == NULL) {
1432 // Exactly one reference is NULL.
1433 return false;
1434 }
1435
1436 if (memcmp(v1_element, v2_element, v1_elem_sz) != 0) {
1437 return false;
1438 }
1439 }
1440
1441 return true;
1442}
1443
1444/**
1445 * Find the index of an element in the vector. Equality is based on mem compare.
1446 *
1447 * @param vector the source vector.
1448 * @param element the element to find.
1449 * @return the index if the element is found, -1 otherwise.
1450 */
1451static int
1452vector_find(cf_vector* vector, void* element)
1453{
1454 int element_count = cf_vector_size(vector);
1455 size_t value_len = VECTOR_ELEM_SZ(vector);
1456 for (int i = 0; i < element_count; i++) {
1457 // No null check required since we are iterating under a lock and within
1458 // vector bounds.
1459 void* src_element = cf_vector_getp(vector, i);
1460 if (src_element) {
1461 if (memcmp(element, src_element, value_len) == 0) {
1462 return i;
1463 }
1464 }
1465 }
1466 return -1;
1467}
1468
1469/**
1470 * Copy all elements form the source vector to the destination vector to the
1471 * destination vector. Assumes the source and destination vector are not being
1472 * modified while the copy operation is in progress.
1473 *
1474 * @param dest the destination vector.
1475 * @param src the source vector.
1476 * @return the number of elements copied.
1477 */
1478static int
1479vector_copy(cf_vector* dest, cf_vector* src)
1480{
1481 int element_count = cf_vector_size(src);
1482 int copied_count = 0;
1483 for (int i = 0; i < element_count; i++) {
1484 // No null check required since we are iterating under a lock and within
1485 // vector bounds.
1486 void* src_element = cf_vector_getp(src, i);
1487 if (src_element) {
1488 cf_vector_append(dest, src_element);
1489 copied_count++;
1490 }
1491 }
1492 return copied_count;
1493}
1494
1495/**
1496 * Copy all elements form the source vector to the destination vector only if
1497 * they do not exist in the destination vector. Assumes the source and
1498 * destination vector are not being modified while the copy operation is in
1499 * progress.
1500 *
1501 * @param dest the destination vector.
1502 * @param src the source vector.
1503 * @return the number of elements copied.
1504 */
1505static int
1506vector_copy_unique(cf_vector* dest, cf_vector* src)
1507{
1508 int element_count = cf_vector_size(src);
1509 int copied_count = 0;
1510 for (int i = 0; i < element_count; i++) {
1511 // No null check required since we are iterating under a lock and within
1512 // vector bounds.
1513 void* src_element = cf_vector_getp(src, i);
1514 if (src_element) {
1515 cf_vector_append_unique(dest, src_element);
1516 copied_count++;
1517 }
1518 }
1519 return copied_count;
1520}
1521
1522/**
1523 * Sorts in place the elements in the vector using the inout comparator function
1524 * and retains only unique elements. Assumes the source vector is not being
1525 * modified while the sort operation is in progress.
1526 *
1527 * @param src the source vector.
1528 * @return comparator the comparator function, which must return an integer less
1529 * than, equal to, or greater than zero if the first argument is considered to
1530 * be respectively less than, equal to, or greater than the second
1531 */
1532static void
1533vector_sort_unique(cf_vector* src, int
1534(*comparator)(const void*, const void*))
1535{
1536 int element_count = cf_vector_size(src);
1537 size_t value_len = VECTOR_ELEM_SZ(src);
1538 size_t array_size = element_count * value_len;
1539 void* element_array = BUFFER_ALLOC_OR_DIE(array_size);
1540
1541 // A lame approach to sorting. Copying the elements to an array and invoking
1542 // qsort.
1543 uint8_t* next_element_ptr = element_array;
1544 int array_element_count = 0;
1545 for (int i = 0; i < element_count; i++) {
1546 // No null check required since we are iterating under a lock and within
1547 // vector bounds.
1548 void* src_element = cf_vector_getp(src, i);
1549 if (src_element) {
1550 memcpy(next_element_ptr, src_element, value_len);
1551 next_element_ptr += value_len;
1552 array_element_count++;
1553 }
1554 }
1555
1556 qsort(element_array, array_element_count, value_len, comparator);
1557
1558 vector_clear(src);
1559 next_element_ptr = element_array;
1560 for (int i = 0; i < array_element_count; i++) {
1561 cf_vector_append_unique(src, next_element_ptr);
1562 next_element_ptr += value_len;
1563 }
1564
1565 BUFFER_FREE(element_array, array_size);
1566 return;
1567}
1568
1569/**
1570 * Remove all elements from the to_remove vector present in the target vector.
1571 * Equality is based on simple mem compare.
1572 *
1573 * @param target the target vector being modified.
1574 * @param to_remove the vector whose elements must be removed from the target.
1575 * @return the number of elements removed.
1576 */
1577static int
1578vector_subtract(cf_vector* target, cf_vector* to_remove)
1579{
1580 int element_count = cf_vector_size(to_remove);
1581 int removed_count = 0;
1582 for (int i = 0; i < element_count; i++) {
1583 // No null check required since we are iterating under a lock and within
1584 // vector bounds.
1585 void* to_remove_element = cf_vector_getp(to_remove, i);
1586 if (to_remove_element) {
1587 int found_at = 0;
1588 while ((found_at = vector_find(target, to_remove_element)) >= 0) {
1589 cf_vector_delete(target, found_at);
1590 removed_count++;
1591 }
1592 }
1593 }
1594
1595 return removed_count;
1596}
1597
1598/**
1599 * Convert a vector to an array.
1600 * FIXME: return pointer to the internal vector storage.
1601 */
1602static cf_node*
1603vector_to_array(cf_vector* vector)
1604{
1605 return (cf_node*)vector->vector;
1606}
1607
1608/**
1609 * Copy elements in a vector to an array.
1610 * @param array the destination array. Should be large enough to hold the number
1611 * all elements in the vector.
1612 * @param src the source vector.
1613 * @param element_count the number of elements to copy from the source vector.
1614 */
1615static void
1616vector_array_cpy(void* array, cf_vector* src, int element_count)
1617{
1618 uint8_t* element_ptr = array;
1619 int element_size = VECTOR_ELEM_SZ(src);
1620 for (int i = 0; i < element_count; i++) {
1621 cf_vector_get(src, i, element_ptr);
1622 element_ptr += element_size;
1623 }
1624}
1625
1626/*
1627 * ----------------------------------------------------------------------------
1628 * Globals
1629 * ----------------------------------------------------------------------------
1630 */
1631
1632/**
1633 * The big fat lock for all clustering state.
1634 */
1635static pthread_mutex_t g_clustering_lock =
1636 PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
1637
1638/**
1639 * The fat lock for all clustering events listener changes.
1640 */
1641static pthread_mutex_t g_clustering_event_publisher_lock =
1642 PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
1643
1644/**
1645 * Debugging lock acquition.
1646 * #define LOCK_DEBUG_ENABLED 1
1647 */
1648#ifdef LOCK_DEBUG_ENABLED
1649#define LOCK_DEBUG(format, ...) DEBUG(format, ##__VA_ARGS__)
1650#else
1651#define LOCK_DEBUG(format, ...)
1652#endif
1653
1654/**
1655 * Acquire a lock on the clustering module.
1656 */
1657#define CLUSTERING_LOCK() \
1658({ \
1659 pthread_mutex_lock (&g_clustering_lock); \
1660 LOCK_DEBUG("locked in %s", __FUNCTION__); \
1661})
1662
1663/**
1664 * Relinquish the lock on the clustering module.
1665 */
1666#define CLUSTERING_UNLOCK() \
1667({ \
1668 pthread_mutex_unlock (&g_clustering_lock); \
1669 LOCK_DEBUG("unLocked in %s", __FUNCTION__); \
1670})
1671
1672/**
1673 * Acquire a lock on the clustering publisher.
1674 */
1675#define CLUSTERING_EVENT_PUBLISHER_LOCK() \
1676({ \
1677 pthread_mutex_lock (&g_clustering_event_publisher_lock); \
1678 LOCK_DEBUG("publisher locked in %s", __FUNCTION__); \
1679})
1680
1681/**
1682 * Relinquish the lock on the clustering publisher.
1683 */
1684#define CLUSTERING_EVENT_PUBLISHER_UNLOCK() \
1685({ \
1686 pthread_mutex_unlock (&g_clustering_event_publisher_lock); \
1687 LOCK_DEBUG("publisher unLocked in %s", __FUNCTION__); \
1688})
1689
1690/**
1691 * Singleton timer.
1692 */
1693static as_clustering_timer g_timer;
1694
1695/**
1696 * Singleton external events publisher.
1697 */
1698static as_clustering_external_event_publisher g_external_event_publisher;
1699
1700/**
1701 * Singleton cluster register to store this node's cluster membership.
1702 */
1703static as_clustering_register g_register;
1704
1705/**
1706 * Singleton clustrering state all initialized to zero.
1707 */
1708static as_clustering g_clustering = { 0 };
1709
1710/**
1711 * Singleton paxos proposer.
1712 */
1713static as_paxos_proposer g_proposer;
1714
1715/**
1716 * Singleton paxos acceptor.
1717 */
1718static as_paxos_acceptor g_acceptor;
1719
1720/**
1721 * Singleton quantum interval generator.
1722 */
1723static as_clustering_quantum_interval_generator g_quantum_interval_generator;
1724
1725/**
1726 * Message template for heart beat messages.
1727 */
1728static msg_template g_clustering_msg_template[] = {
1729
1730{ AS_CLUSTERING_MSG_ID, M_FT_UINT32 },
1731
1732{ AS_CLUSTERING_MSG_TYPE, M_FT_UINT32 },
1733
1734{ AS_CLUSTERING_MSG_HLC_TIMESTAMP, M_FT_UINT64 },
1735
1736{ AS_CLUSTERING_MSG_SEQUENCE_NUMBER, M_FT_UINT64 },
1737
1738{ AS_CLUSTERING_MSG_CLUSTER_KEY, M_FT_UINT64 },
1739
1740{ AS_CLUSTERING_MSG_SUCCESSION_LIST, M_FT_BUF },
1741
1742{ AS_CLUSTERING_MSG_PROPOSED_PRINCIPAL, M_FT_UINT64 }
1743
1744};
1745
1746/*
1747 * ----------------------------------------------------------------------------
1748 * Clustering life cycle
1749 * ----------------------------------------------------------------------------
1750 */
1751
1752/**
1753 * Check if clustering is initialized.
1754 */
1755static bool
1756clustering_is_initialized()
1757{
1758 CLUSTERING_LOCK();
1759 bool initialized = (g_clustering.sys_state
1760 != AS_CLUSTERING_SYS_STATE_UNINITIALIZED);
1761 CLUSTERING_UNLOCK();
1762 return initialized;
1763}
1764
1765/**
1766 * * Check if clustering is running.
1767 */
1768static bool
1769clustering_is_running()
1770{
1771 CLUSTERING_LOCK();
1772 bool running = g_clustering.sys_state == AS_CLUSTERING_SYS_STATE_RUNNING;
1773 CLUSTERING_UNLOCK();
1774 return running;
1775}
1776
1777/*
1778 * ----------------------------------------------------------------------------
1779 * Config related functions
1780 * ----------------------------------------------------------------------------
1781 */
1782
1783/**
1784 * The nodeid for this node.
1785 */
1786static cf_node
1787config_self_nodeid_get()
1788{
1789 return g_config.self_node;
1790}
1791
1792/*
1793 * ----------------------------------------------------------------------------
1794 * Compatibility mode functions
1795 * ----------------------------------------------------------------------------
1796 */
1797
1798/**
1799 * Return current protocol version identifier.
1800 */
1801as_cluster_proto_identifier
1802clustering_protocol_identifier_get()
1803{
1804 return 0x707C;
1805}
1806
1807/**
1808 * Compare clustering protocol versions for compatibility.
1809 */
1810bool
1811clustering_versions_are_compatible(as_cluster_proto_identifier v1,
1812 as_cluster_proto_identifier v2)
1813{
1814 return v1 == v2;
1815}
1816
1817/*
1818 * ----------------------------------------------------------------------------
1819 * Timer event generator
1820 *
1821 * TODO: Can be abstracted out as a single scheduler single utility across
1822 * modules.
1823 * ----------------------------------------------------------------------------
1824 */
1825
1826static void
1827timer_init()
1828{
1829 CLUSTERING_LOCK();
1830 memset(&g_timer, 0, sizeof(g_timer));
1831 CLUSTERING_UNLOCK();
1832}
1833
1834/**
1835 * Clustering timer event generator thread, to help with retries and retransmits
1836 * across all states.
1837 */
1838static void*
1839timer_thr(void* arg)
1840{
1841 as_clustering_internal_event timer_event;
1842 memset(&timer_event, 0, sizeof(timer_event));
1843 timer_event.type = AS_CLUSTERING_INTERNAL_EVENT_TIMER;
1844
1845 while (clustering_is_running()) {
1846 // Wait for a while and retry.
1847 internal_event_dispatch(&timer_event);
1848 usleep(timer_tick_interval() * 1000);
1849 }
1850
1851 return NULL;
1852}
1853
1854/**
1855 * Start the timer.
1856 */
1857static void
1858timer_start()
1859{
1860 CLUSTERING_LOCK();
1861 g_timer.timer_tid = cf_thread_create_joinable(timer_thr, NULL);
1862 CLUSTERING_UNLOCK();
1863}
1864
1865/**
1866 * Stop the timer.
1867 */
1868static void
1869timer_stop()
1870{
1871 CLUSTERING_LOCK();
1872 cf_thread_join(g_timer.timer_tid);
1873 CLUSTERING_UNLOCK();
1874}
1875
1876/*
1877 * ----------------------------------------------------------------------------
1878 * Heartbeat subsystem interfacing
1879 * ----------------------------------------------------------------------------
1880 */
1881
1882/*
1883 * The structure of data clustring subsystem pushes with in hb pulse messages
1884 * and retains as plugin data is as follows.
1885 *
1886 * Each row occupies 4 bytes.
1887 *
1888 * V5 heartbeat wire payload structure.
1889 * ===============================
1890 *
1891 * ------------|-------------|------------|------------|
1892 * | Clustering Protocol identifier |
1893 * |---------------------------------------------------|
1894 * | |
1895 * |-------- Cluster Key ------------------------------|
1896 * | |
1897 * |---------------------------------------------------|
1898 * | |
1899 * |-------- Paxos sequence number --------------------|
1900 * | |
1901 * |---------------------------------------------------|
1902 * | |
1903 * |-------- Preferred principal ----------------------|
1904 * | |
1905 * |---------------------------------------------------|
1906 * | Length of succession list |
1907 * |---------------------------------------------------|
1908 * | |
1909 * |-------- Succ. Node id 0 --------------------------|
1910 * | |
1911 * |---------------------------------------------------|
1912 * | |
1913 * |-------- Succ. Node id 1 --------------------------|
1914 * | |
1915 * |---------------------------------------------------|
1916 * | . |
1917 * | . |
1918 *
1919 *
1920 * Cluster key and succession lists helps with detecting cluster integrity,
1921 * Plain clusterkey should be good enough but matching succession lists adds to
1922 * another level of safety (may not be required but being cautious).
1923 *
1924 * For orpahned node cluster key and length of succession list are set to zero.
1925 *
1926 * The parsed hb pluging data is just the same as the wire payload structure.
1927 * The plugin code ensure invalid content will never be parsed as plugin data to
1928 * memory. The direct implication is that if plugin data is not NULL,
1929 * required fields
1930 * - Clustering protocol identifier
1931 * - Cluster key
1932 * - Succession list length will always be present when read back from the
1933 * heartbeat subsystem and the succession list will be consistent with the
1934 * succession list length.
1935 */
1936
1937/**
1938 * Read plugin data from hb layer for a node, using stack allocated space.
1939 * Will attempt a max of 3 attempts before crashing.
1940 * plugin_data_p->data_size will be zero and plugin_data_p->data will be NULL if
1941 * an entry for the node does not exist.
1942 */
1943#define clustering_hb_plugin_data_get(nodeid, plugin_data_p, \
1944 hb_msg_hlc_ts_p, msg_recv_ts_p) \
1945({ \
1946 (plugin_data_p)->data_capacity = 1024; \
1947 int tries_remaining = 3; \
1948 bool enoent = false; \
1949 bool rv = -1; \
1950 while (tries_remaining--) { \
1951 (plugin_data_p)->data = alloca((plugin_data_p)->data_capacity); \
1952 if (as_hb_plugin_data_get(nodeid, AS_HB_PLUGIN_CLUSTERING, \
1953 plugin_data_p, hb_msg_hlc_ts_p, msg_recv_ts_p) == 0) { \
1954 rv = 0; \
1955 break; \
1956 } \
1957 if (errno == ENOENT) { \
1958 enoent = true; \
1959 break; \
1960 } \
1961 if (errno == ENOMEM) { \
1962 (plugin_data_p)->data_capacity = (plugin_data_p)->data_size; \
1963 } \
1964 } \
1965 if (rv != 0 && !enoent && tries_remaining < 0) { \
1966 CRASH("error allocating space for paxos hb plugin data"); \
1967 } \
1968 if (enoent) { \
1969 (plugin_data_p)->data_size = 0; \
1970 (plugin_data_p)->data = NULL; \
1971 } \
1972 rv; \
1973})
1974
1975/**
1976 * Get a pointer to the protocol identifier inside plugin data. Will be NULL if
1977 * plugin data is null or there are not enough bytes in the data to hold the
1978 * identifier.
1979 * @param plugin_data can be NULL.
1980 * @param plugin_data_size the size of plugin data.
1981 * @return pointer to the protocol identifier on success, NULL on failure.
1982 */
1983static as_cluster_proto_identifier*
1984clustering_hb_plugin_proto_get(void* plugin_data, size_t plugin_data_size)
1985{
1986 if (plugin_data == NULL
1987 || plugin_data_size < sizeof(as_cluster_proto_identifier)) {
1988 // The data does not hold valid data or there is no cluster key and or
1989 // succession list is missing.
1990 return NULL;
1991 }
1992
1993 return (as_cluster_proto_identifier*)plugin_data;
1994}
1995
1996/**
1997 * Retrieves the cluster key from clustering hb plugin data.
1998 * @param plugin_data can be NULL.
1999 * @param plugin_data_size the size of plugin data.
2000 * @return pointer to the cluster key on success, NULL on failure.
2001 */
2002static as_cluster_key*
2003clustering_hb_plugin_cluster_key_get(void* plugin_data, size_t plugin_data_size)
2004{
2005 uint8_t* proto = (uint8_t*)clustering_hb_plugin_proto_get(plugin_data,
2006 plugin_data_size);
2007 if (proto == NULL) {
2008 // The data does not hold valid data.
2009 return NULL;
2010 }
2011
2012 if ((uint8_t*)plugin_data + plugin_data_size
2013 < proto + sizeof(as_cluster_proto_identifier)
2014 + sizeof(as_cluster_key)) {
2015 // Not enough bytes for cluster key.
2016 return NULL;
2017 }
2018
2019 return (as_cluster_key*)(proto + sizeof(as_cluster_proto_identifier));
2020}
2021
2022/**
2023 * Retrieves the sequence number from clustering hb plugin data.
2024 * @param plugin_data can be NULL.
2025 * @param plugin_data_size the size of plugin data.
2026 * @return pointer to the sequence number on success, NULL on failure.
2027 */
2028static as_paxos_sequence_number*
2029clustering_hb_plugin_sequence_number_get(void* plugin_data,
2030 size_t plugin_data_size)
2031{
2032 uint8_t* cluster_key = (uint8_t*)clustering_hb_plugin_cluster_key_get(
2033 plugin_data, plugin_data_size);
2034 if (cluster_key == NULL) {
2035 // The data does not hold valid data or there is no cluster key.
2036 return NULL;
2037 }
2038
2039 if ((uint8_t*)plugin_data + plugin_data_size
2040 < cluster_key + sizeof(as_cluster_key)
2041 + sizeof(as_paxos_sequence_number)) {
2042 // Not enough bytes for succession list length.
2043 return NULL;
2044 }
2045
2046 return (as_paxos_sequence_number*)(cluster_key + sizeof(as_cluster_key));
2047}
2048
2049/**
2050 * Retrieves the preferred principal from clustering hb plugin data.
2051 * @param plugin_data can be NULL.
2052 * @param plugin_data_size the size of plugin data.
2053 * @return pointer to the preferred principal on success, NULL on failure.
2054 */
2055static cf_node*
2056clustering_hb_plugin_preferred_principal_get(void* plugin_data,
2057 size_t plugin_data_size)
2058{
2059 uint8_t* sequence_number_p =
2060 (uint8_t*)clustering_hb_plugin_sequence_number_get(plugin_data,
2061 plugin_data_size);
2062 if (sequence_number_p == NULL) {
2063 // The data does not hold valid data or there is no sequence number.
2064 return NULL;
2065 }
2066
2067 if ((uint8_t*)plugin_data + plugin_data_size
2068 < sequence_number_p + sizeof(as_paxos_sequence_number)
2069 + sizeof(cf_node)) {
2070 // Not enough bytes for preferred principal.
2071 return NULL;
2072 }
2073
2074 return (as_paxos_sequence_number*)(sequence_number_p
2075 + sizeof(as_paxos_sequence_number));
2076}
2077
2078/**
2079 * Retrieves the succession list length pointer from clustering hb plugin data.
2080 * @param plugin_data can be NULL.
2081 * @param plugin_data_size the size of plugin data.
2082 * @return pointer to succession list length on success, NULL on failure.
2083 */
2084static uint32_t*
2085clustering_hb_plugin_succession_length_get(void* plugin_data,
2086 size_t plugin_data_size)
2087{
2088 uint8_t* preferred_principal_p =
2089 (uint8_t*)clustering_hb_plugin_preferred_principal_get(plugin_data,
2090 plugin_data_size);
2091 if (preferred_principal_p == NULL) {
2092 // The data does not hold valid data or there is no preferred principal
2093 // and or succession list is missing.
2094 return NULL;
2095 }
2096
2097 if ((uint8_t*)plugin_data + plugin_data_size
2098 < preferred_principal_p + sizeof(cf_node) + sizeof(uint32_t)) {
2099 // Not enough bytes for succession list length.
2100 return NULL;
2101 }
2102
2103 return (uint32_t*)(preferred_principal_p + sizeof(cf_node));
2104}
2105
2106/**
2107 * Retrieves the pointer to the first node in the succession list.
2108 * @param plugin_data can be NULL.
2109 * @param plugin_data_size the size of plugin data.
2110 * @return pointer to first node in succession list on success, NULL on failure
2111 * or if the succession list is empty.
2112 */
2113static cf_node*
2114clustering_hb_plugin_succession_get(void* plugin_data, size_t plugin_data_size)
2115{
2116 uint8_t* succession_list_length_p =
2117 (uint8_t*)clustering_hb_plugin_succession_length_get(plugin_data,
2118 plugin_data_size);
2119 if (succession_list_length_p == NULL) {
2120 // The data does not hold valid data or there is no cluster key and or
2121 // succession list is missing.
2122 return NULL;
2123 }
2124
2125 if (*(uint32_t*)succession_list_length_p == 0) {
2126 // Empty succession list.
2127 return NULL;
2128 }
2129
2130 if ((uint8_t*)plugin_data + plugin_data_size
2131 < succession_list_length_p + sizeof(uint32_t)
2132 + (sizeof(cf_node) * (*(uint32_t*)succession_list_length_p))) {
2133 // Not enough bytes for succession list length.
2134 return NULL;
2135 }
2136
2137 return (cf_node*)(succession_list_length_p + sizeof(uint32_t));
2138}
2139
2140/**
2141 * Validate the correctness of plugin data. By ensuring all required fields are
2142 * present and the succession list matches the provided length.
2143 * @param plugin_data can be NULL.
2144 * @param plugin_data_size the size of plugin data.
2145 * @return pointer to first node in succession list on success, NULL on failure.
2146 */
2147static bool
2148clustering_hb_plugin_data_is_valid(void* plugin_data, size_t plugin_data_size)
2149{
2150 void* proto_identifier_p = clustering_hb_plugin_proto_get(plugin_data,
2151 plugin_data_size);
2152 if (proto_identifier_p == NULL) {
2153 DEBUG("plugin data missing protocol identifier");
2154 return false;
2155 }
2156
2157 as_cluster_proto_identifier current_proto_identifier =
2158 clustering_protocol_identifier_get();
2159 if (!clustering_versions_are_compatible(current_proto_identifier,
2160 *(as_cluster_proto_identifier*)proto_identifier_p)) {
2161 DEBUG("protocol versions incompatible - expected %"PRIx32" but was: %"PRIx32,
2162 current_proto_identifier,
2163 *(as_cluster_proto_identifier*)proto_identifier_p);
2164 return false;
2165 }
2166
2167 void* cluster_key_p = clustering_hb_plugin_cluster_key_get(plugin_data,
2168 plugin_data_size);
2169 if (cluster_key_p == NULL) {
2170 DEBUG("plugin data missing cluster key");
2171 return false;
2172 }
2173
2174 void* sequence_number_p = clustering_hb_plugin_sequence_number_get(
2175 plugin_data, plugin_data_size);
2176 if (sequence_number_p == NULL) {
2177 DEBUG("plugin data missing sequence number");
2178 return false;
2179 }
2180
2181 void* preferred_principal_p = clustering_hb_plugin_preferred_principal_get(
2182 plugin_data, plugin_data_size);
2183 if (preferred_principal_p == NULL) {
2184 DEBUG("plugin data missing preferred principal");
2185 return false;
2186 }
2187
2188 uint32_t* succession_list_length_p =
2189 (void*)clustering_hb_plugin_succession_length_get(plugin_data,
2190 plugin_data_size);
2191 if (succession_list_length_p == NULL) {
2192 DEBUG("plugin data missing succession list length");
2193 return false;
2194 }
2195
2196 void* succession_list_p = clustering_hb_plugin_succession_get(plugin_data,
2197 plugin_data_size);
2198
2199 if (*succession_list_length_p > 0 && succession_list_p == NULL) {
2200 DEBUG("succession list length %d, but succession list is empty",
2201 *succession_list_length_p);
2202 return false;
2203 }
2204
2205 return true;
2206}
2207
2208/**
2209 * Determines if the plugin data with hb subsystem is old to be ignored.
2210 * ALL access to plugin data should be vetted through this function. The plugin
2211 * data is obsolete if it was send before the current cluster state or has a
2212 * version mismatch.
2213 *
2214 * This is detemined by comparing the plugin data hb message hlc timestamp and
2215 * monotonic timestamps with the cluster formation hlc and monotonic times.
2216 *
2217 * @param cluster_modified_hlc_ts the hlc timestamp when current cluster change
2218 * happened. Sent to avoid locking in this function.
2219 * @param cluster_modified_time the monotonic timestamp when current cluster
2220 * change happened. Sento to avoid locking in this function.
2221 * @param plugin_data the plugin data.
2222 * @param plugin_data_size the size of plugin data.
2223 * @param msg_recv_ts the monotonic timestamp for plugin data receive.
2224 * @param hb_msg_hlc_ts the hlc timestamp for plugin data receive.
2225 * @return true if plugin data is obsolete, false otherwise.
2226 */
2227static bool
2228clustering_hb_plugin_data_is_obsolete(as_hlc_timestamp cluster_modified_hlc_ts,
2229 cf_clock cluster_modified_time, void* plugin_data,
2230 size_t plugin_data_size, cf_clock msg_recv_ts,
2231 as_hlc_msg_timestamp* hb_msg_hlc_ts)
2232{
2233 if (!clustering_hb_plugin_data_is_valid(plugin_data, plugin_data_size)) {
2234 // Plugin data is invalid. Assume it to be obsolete.
2235 // Seems like a redundant check but required in case clustering protocol
2236 // was switched to an incompatible version.
2237 return true;
2238 }
2239
2240 if (as_hlc_send_timestamp_order(cluster_modified_hlc_ts, hb_msg_hlc_ts)
2241 != AS_HLC_HAPPENS_BEFORE) {
2242 // Cluster formation time after message send or the order is unknown,
2243 // assume cluster formation is after message send. the caller should
2244 // ignore this message.
2245 return true;
2246 }
2247
2248 // HB data should be atleast after cluster formation time + one hb interval
2249 // to send out our cluster state + one network delay for our information to
2250 // reach the remote node + one hb interval for the other node to send out
2251 // the his updated state + one network delay for the updated state to reach
2252 // us.
2253 if (cluster_modified_time + 2 * as_hb_tx_interval_get()
2254 + 2 * g_config.fabric_latency_max_ms > msg_recv_ts) {
2255 return true;
2256 }
2257
2258 return false;
2259}
2260
2261/**
2262 * Indicates if the plugin data for a node indicates that it is an orphan node.
2263 */
2264static as_clustering_peer_node_state
2265clustering_hb_plugin_data_node_status(void* plugin_data,
2266 size_t plugin_data_size)
2267{
2268 if (!clustering_hb_plugin_data_is_valid(plugin_data, plugin_data_size)) {
2269 // Either we have not hb channel to this node or it has sen invalid
2270 // plugin data. Assuming the cluster state is unknown.
2271 return AS_NODE_UNKNOWN;
2272 }
2273
2274 as_cluster_key* cluster_key = clustering_hb_plugin_cluster_key_get(
2275 plugin_data, plugin_data_size);
2276
2277 if (*cluster_key == 0) {
2278 return AS_NODE_ORPHAN;
2279 }
2280
2281 // Redundant paranoid check.
2282 uint32_t* succession_list_length_p =
2283 clustering_hb_plugin_succession_length_get(plugin_data,
2284 plugin_data_size);
2285
2286 if (*succession_list_length_p == 0) {
2287 return AS_NODE_ORPHAN;
2288 }
2289
2290 return AS_NODE_CLUSTER_ASSIGNED;
2291}
2292
2293/**
2294 * Push clustering payload into a heartbeat pulse message. The payload format is
2295 * as described above.
2296 */
2297static void
2298clustering_hb_plugin_set_fn(msg* msg)
2299{
2300 if (!clustering_is_initialized()) {
2301 // Clustering not initialized. Send no data at all.
2302 return;
2303 }
2304
2305 CLUSTERING_LOCK();
2306
2307 uint32_t cluster_size = cf_vector_size(&g_register.succession_list);
2308
2309 size_t payload_size =
2310 // For the paxos version identifier
2311 sizeof(uint32_t)
2312 // For cluster key
2313 + sizeof(as_cluster_key)
2314 // For sequence number
2315 + sizeof(as_paxos_sequence_number)
2316 // For preferred principal
2317 + sizeof(cf_node)
2318 // For succession list length.
2319 + sizeof(uint32_t)
2320 // For succession list.
2321 + (sizeof(cf_node) * cluster_size);
2322
2323 uint8_t* payload = alloca(payload_size);
2324
2325 uint8_t* current_field_p = payload;
2326
2327 // Set the paxos protocol identifier.
2328 uint32_t protocol = clustering_protocol_identifier_get();
2329 memcpy(current_field_p, &protocol, sizeof(protocol));
2330 current_field_p += sizeof(protocol);
2331
2332 // Set cluster key.
2333 memcpy(current_field_p, &g_register.cluster_key,
2334 sizeof(g_register.cluster_key));
2335 current_field_p += sizeof(g_register.cluster_key);
2336
2337 // Set the sequence number.
2338 memcpy(current_field_p, &g_register.sequence_number,
2339 sizeof(g_register.sequence_number));
2340 current_field_p += sizeof(g_register.sequence_number);
2341
2342 // Set the preferred principal.
2343 memcpy(current_field_p, &g_clustering.preferred_principal,
2344 sizeof(g_clustering.preferred_principal));
2345 current_field_p += sizeof(g_clustering.preferred_principal);
2346
2347 // Set succession length
2348 memcpy(current_field_p, &cluster_size, sizeof(cluster_size));
2349 current_field_p += sizeof(cluster_size);
2350
2351 // Copy over the succession list.
2352 cf_node* succession = (cf_node*)(current_field_p);
2353 for (int i = 0; i < cluster_size; i++) {
2354 cf_vector_get(&g_register.succession_list, i, &succession[i]);
2355 }
2356
2357 msg_set_buf(msg, AS_HB_MSG_PAXOS_DATA, payload, payload_size, MSG_SET_COPY);
2358
2359 CLUSTERING_UNLOCK();
2360}
2361
2362/**
2363 * Plugin parse function that copies the msg payload verbatim to a plugin data.
2364 */
2365static void
2366clustering_hb_plugin_parse_data_fn(msg* msg, cf_node source,
2367 as_hb_plugin_node_data* prev_plugin_data,
2368 as_hb_plugin_node_data* plugin_data)
2369{
2370 // Lockless check to prevent deadlocks.
2371 if (g_clustering.sys_state == AS_CLUSTERING_SYS_STATE_UNINITIALIZED) {
2372 // Ignore this heartbeat.
2373 plugin_data->data_size = 0;
2374 return;
2375 }
2376
2377 void* payload;
2378 size_t payload_size;
2379
2380 if (msg_get_buf(msg, AS_HB_MSG_PAXOS_DATA, (uint8_t**)&payload,
2381 &payload_size, MSG_GET_DIRECT) != 0) {
2382 cf_ticker_warning(AS_CLUSTERING,
2383 "received empty clustering payload in heartbeat pulse from node %"PRIx64,
2384 source);
2385 plugin_data->data_size = 0;
2386 return;
2387 }
2388
2389 // Validate and retain only valid plugin data.
2390 if (!clustering_hb_plugin_data_is_valid(payload, payload_size)) {
2391 cf_ticker_warning(AS_CLUSTERING,
2392 "received invalid clustering payload in heartbeat pulse from node %"PRIx64,
2393 source);
2394 plugin_data->data_size = 0;
2395 return;
2396 }
2397
2398 if (payload_size > plugin_data->data_capacity) {
2399 // Round up to nearest multiple of block size to prevent very frequent
2400 // reallocation.
2401 size_t data_capacity = ((payload_size + HB_PLUGIN_DATA_BLOCK_SIZE - 1)
2402 / HB_PLUGIN_DATA_BLOCK_SIZE) * HB_PLUGIN_DATA_BLOCK_SIZE;
2403
2404 // Reallocate since we have outgrown existing capacity.
2405 plugin_data->data = cf_realloc(plugin_data->data, data_capacity);
2406 plugin_data->data_capacity = data_capacity;
2407 }
2408
2409 plugin_data->data_size = payload_size;
2410 memcpy(plugin_data->data, payload, payload_size);
2411}
2412
2413/**
2414 * Check if the input succession list from hb plugin data matches, with a
2415 * succession list vector.
2416 * @param succession_list the first succession list.
2417 * @param succession_list_length the length of the succession list.
2418 * @param succession_list_vector the second succession list as a vector. Should
2419 * be protected from multithreaded access while this function is running.
2420 * @return true if the succcession lists are equal, false otherwise.
2421 */
2422bool
2423clustering_hb_succession_list_matches(cf_node* succession_list,
2424 uint32_t succession_list_length, cf_vector* succession_list_vector)
2425{
2426 if (succession_list_length != cf_vector_size(succession_list_vector)) {
2427 return false;
2428 }
2429
2430 for (uint32_t i = 0; i < succession_list_length; i++) {
2431 cf_node* vector_element = cf_vector_getp(succession_list_vector, i);
2432 if (vector_element == NULL || *vector_element != succession_list[i]) {
2433 return false;
2434 }
2435 }
2436 return true;
2437}
2438
2439/*
2440 * ----------------------------------------------------------------------------
2441 * Quantum interval generator
2442 * ----------------------------------------------------------------------------
2443 */
2444
2445/**
2446 * Time taken for the effect of a fault to get propogated via HB.
2447 */
2448static uint32_t
2449quantum_interval_hb_fault_comm_delay()
2450{
2451 return as_hb_tx_interval_get() + network_latency_max();
2452}
2453
2454/**
2455 * Quantum wait time after node arrived event.
2456 */
2457static uint32_t
2458quantum_interval_node_arrived_wait_time(as_clustering_quantum_fault* fault)
2459{
2460 return MIN(quantum_interval(),
2461 (fault->last_event_ts - fault->event_ts) / 2
2462 + 2 * quantum_interval_hb_fault_comm_delay()
2463 + quantum_interval() / 2);
2464}
2465
2466/**
2467 * Quantum wait time after node departs.
2468 */
2469static uint32_t
2470quantum_interval_node_departed_wait_time(as_clustering_quantum_fault* fault)
2471{
2472 return MIN(quantum_interval(),
2473 as_hb_node_timeout_get()
2474 + 2 * quantum_interval_hb_fault_comm_delay()
2475 + quantum_interval() / 4);
2476}
2477
2478/**
2479 * Quantum wait time after a peer nodes adjacency changed.
2480 */
2481static uint32_t
2482quantum_interval_peer_adjacency_changed_wait_time(
2483 as_clustering_quantum_fault* fault)
2484{
2485 return MIN(quantum_interval(), quantum_interval_hb_fault_comm_delay());
2486}
2487
2488/**
2489 * Quantum wait time after accepting a join request.
2490 */
2491static uint32_t
2492quantum_interval_join_accepted_wait_time(as_clustering_quantum_fault* fault)
2493{
2494 // Ensure we wait for atleast one heartbeat interval to receive the latest
2495 // heartbeat after the last join request and for other nodes to send their
2496 // join requests as well.
2497 return MIN(quantum_interval(),
2498 (fault->last_event_ts - fault->event_ts)
2499 + join_cluster_check_interval() + network_latency_max()
2500 + as_hb_tx_interval_get());
2501}
2502
2503/**
2504 * Quantum wait time after principal node departs.
2505 */
2506static uint32_t
2507quantum_interval_principal_departed_wait_time(
2508 as_clustering_quantum_fault* fault)
2509{
2510 // Anticipate an incoming join request from other orphaned cluster members.
2511 return MIN(quantum_interval(),
2512 as_hb_node_timeout_get()
2513 + 2 * quantum_interval_hb_fault_comm_delay()
2514 + MAX(quantum_interval() / 4,
2515 quantum_interval_join_accepted_wait_time(fault)));
2516}
2517
2518/**
2519 * Quantum wait time after seeing a cluster that might send us a join request.
2520 */
2521static uint32_t
2522quantum_interval_inbound_merge_candidate_wait_time(
2523 as_clustering_quantum_fault* fault)
2524{
2525 return quantum_interval();
2526}
2527
2528/**
2529 * Quantum wait time after a cluster member has been orphaned.
2530 */
2531static uint32_t
2532quantum_interval_member_orphaned_wait_time(as_clustering_quantum_fault* fault)
2533{
2534 return quantum_interval();
2535}
2536
2537/**
2538 * Marks the current quantum interval as skipped. A kludge to allow quantum to
2539 * allow quantum interval generator to mark quantum intervals as postponed.
2540 */
2541static void
2542quantum_interval_mark_postponed()
2543{
2544 CLUSTERING_LOCK();
2545 g_quantum_interval_generator.is_interval_postponed = true;
2546 CLUSTERING_UNLOCK();
2547}
2548
2549/**
2550 * Update the vtable for a fault.
2551 */
2552static void
2553quantum_interval_vtable_update(as_clustering_quantum_fault_type type,
2554 char *fault_log_str, as_clustering_quantum_fault_wait_fn wait_fn)
2555{
2556 CLUSTERING_LOCK();
2557 g_quantum_interval_generator.vtable[type].fault_log_str = fault_log_str;
2558 g_quantum_interval_generator.vtable[type].wait_fn = wait_fn;
2559 CLUSTERING_UNLOCK();
2560}
2561
2562/**
2563 * Initialize quantum interval generator.
2564 */
2565static void
2566quantum_interval_generator_init()
2567{
2568 CLUSTERING_LOCK();
2569 memset(&g_quantum_interval_generator, 0,
2570 sizeof(g_quantum_interval_generator));
2571 g_quantum_interval_generator.last_quantum_start_time = cf_getms();
2572 g_quantum_interval_generator.last_quantum_interval = quantum_interval();
2573
2574 // Initialize the vtable.
2575 quantum_interval_vtable_update(QUANTUM_FAULT_NODE_ARRIVED, "node arrived",
2576 quantum_interval_node_arrived_wait_time);
2577 quantum_interval_vtable_update(QUANTUM_FAULT_NODE_DEPARTED, "node departed",
2578 quantum_interval_node_departed_wait_time);
2579 quantum_interval_vtable_update(QUANTUM_FAULT_PRINCIPAL_DEPARTED,
2580 "principal departed",
2581 quantum_interval_principal_departed_wait_time);
2582 quantum_interval_vtable_update(QUANTUM_FAULT_PEER_ADJACENCY_CHANGED,
2583 "peer adjacency changed",
2584 quantum_interval_peer_adjacency_changed_wait_time);
2585 quantum_interval_vtable_update(QUANTUM_FAULT_JOIN_ACCEPTED,
2586 "join request accepted", quantum_interval_join_accepted_wait_time);
2587 quantum_interval_vtable_update(QUANTUM_FAULT_INBOUND_MERGE_CANDIDATE_SEEN,
2588 "merge candidate seen",
2589 quantum_interval_inbound_merge_candidate_wait_time);
2590 quantum_interval_vtable_update(QUANTUM_FAULT_CLUSTER_MEMBER_ORPHANED,
2591 "member orphaned", quantum_interval_member_orphaned_wait_time);
2592
2593 CLUSTERING_UNLOCK();
2594}
2595
2596/**
2597 * Get the earliest possible monotonic clock time the next quantum interval can
2598 * start.
2599 *
2600 * Start quantum interval after the last update to any one of adjacency,
2601 * pending_join_requests , neighboring_principals. The heuristic is that these
2602 * should be stable to initiate cluster merge / join or cluster formation
2603 * requests.
2604 */
2605static cf_clock
2606quantum_interval_earliest_start_time()
2607{
2608 CLUSTERING_LOCK();
2609 cf_clock fault_event_time = 0;
2610 for (int i = 0; i < QUANTUM_FAULT_TYPE_SENTINEL; i++) {
2611 if (g_quantum_interval_generator.fault[i].event_ts) {
2612 fault_event_time = MAX(fault_event_time,
2613 g_quantum_interval_generator.fault[i].event_ts
2614 + g_quantum_interval_generator.vtable[i].wait_fn(
2615 &g_quantum_interval_generator.fault[i]));
2616 }
2617
2618 DETAIL("Fault:%s event_ts:%"PRIu64,
2619 g_quantum_interval_generator.vtable[i].fault_log_str,
2620 g_quantum_interval_generator.fault[i].event_ts);
2621 }
2622
2623 DETAIL("Last Quantum interval:%"PRIu64,
2624 g_quantum_interval_generator.last_quantum_start_time);
2625
2626 cf_clock start_time = g_quantum_interval_generator.last_quantum_start_time
2627 + quantum_interval();
2628 if (fault_event_time) {
2629 // Ensure we have at least 1/2 quantum interval of separation between
2630 // quantum intervals to give chance to multiple fault events that are
2631 // resonably close in time.
2632 start_time = MAX(
2633 g_quantum_interval_generator.last_quantum_start_time
2634 + quantum_interval() / 2, fault_event_time);
2635 }
2636 CLUSTERING_UNLOCK();
2637
2638 return start_time;
2639}
2640
2641/**
2642 * Reset quantum interval fault.
2643 * @param fault_type the fault type.
2644 */
2645static void
2646quantum_interval_fault_reset(as_clustering_quantum_fault_type fault_type)
2647{
2648 CLUSTERING_LOCK();
2649 memset(&g_quantum_interval_generator.fault[fault_type], 0,
2650 sizeof(g_quantum_interval_generator.fault[fault_type]));
2651 CLUSTERING_UNLOCK();
2652}
2653
2654/**
2655 * Update a fault event based on the current fault ts.
2656 * @param fault the fault to update.
2657 * @param fault_ts the new fault timestamp
2658 * @param src_nodeid the fault causing nodeid, 0 if the nodeid is not known.
2659 */
2660static void
2661quantum_interval_fault_update(as_clustering_quantum_fault_type fault_type,
2662 cf_clock fault_ts, cf_node src_nodeid)
2663{
2664 CLUSTERING_LOCK();
2665 as_clustering_quantum_fault* fault =
2666 &g_quantum_interval_generator.fault[fault_type];
2667 if (fault->event_ts == 0
2668 || fault_ts - fault->event_ts > quantum_interval() / 2) {
2669 // Fault event detected first time in this quantum or we are seeing the
2670 // effect of a different event more than half quantum apart.
2671 fault->event_ts = fault_ts;
2672 DETAIL("updated '%s' fault with ts %"PRIu64" for node %"PRIx64,
2673 g_quantum_interval_generator.vtable[fault_type].fault_log_str, fault_ts, src_nodeid);
2674 }
2675
2676 fault->last_event_ts = fault_ts;
2677 CLUSTERING_UNLOCK();
2678}
2679
2680/**
2681 * Reset the state for the next quantum interval.
2682 */
2683static void
2684quantum_interval_generator_reset(cf_clock last_quantum_start_time)
2685{
2686 CLUSTERING_LOCK();
2687 if (!g_quantum_interval_generator.is_interval_postponed) {
2688 // Update last quantum interval.
2689 g_quantum_interval_generator.last_quantum_interval = MAX(0,
2690 last_quantum_start_time
2691 - g_quantum_interval_generator.last_quantum_start_time);
2692
2693 g_quantum_interval_generator.last_quantum_start_time =
2694 last_quantum_start_time;
2695 for (int i = 0; i < QUANTUM_FAULT_TYPE_SENTINEL; i++) {
2696 quantum_interval_fault_reset(i);
2697 }
2698 }
2699 g_quantum_interval_generator.is_interval_postponed = false;
2700
2701 CLUSTERING_UNLOCK();
2702}
2703
2704/**
2705 * Handle timer event and generate a quantum internal event if required.
2706 */
2707static void
2708quantum_interval_generator_timer_event_handle(
2709 as_clustering_internal_event* timer_event)
2710{
2711 CLUSTERING_LOCK();
2712 cf_clock now = cf_getms();
2713
2714 cf_clock earliest_quantum_start_time =
2715 quantum_interval_earliest_start_time();
2716
2717 cf_clock expected_quantum_start_time =
2718 g_quantum_interval_generator.last_quantum_start_time
2719 + g_quantum_interval_generator.last_quantum_interval;
2720
2721 // Provide a buffer for current quantum interval to finish gracefully as
2722 // long as it is less than half a quantum interval.
2723 cf_clock quantum_wait_buffer = MIN(
2724 earliest_quantum_start_time > expected_quantum_start_time ?
2725 earliest_quantum_start_time - expected_quantum_start_time :
2726 0, g_quantum_interval_generator.last_quantum_interval / 2);
2727
2728 // Fire quantum interval start event if it is time, or if we have skipped
2729 // quantum interval start for more that the max skip number of intervals.
2730 // Add a buffer of wait time to ensure we wait a bit more if we can cover
2731 // the waiting time.
2732 bool is_skippable = g_quantum_interval_generator.last_quantum_start_time
2733 + (quantum_interval_skip_max() + 1)
2734 * g_quantum_interval_generator.last_quantum_interval
2735 + quantum_wait_buffer > now;
2736 bool fire_quantum_event = earliest_quantum_start_time <= now
2737 || !is_skippable;
2738 CLUSTERING_UNLOCK();
2739
2740 if (fire_quantum_event) {
2741 as_clustering_internal_event timer_event;
2742 memset(&timer_event, 0, sizeof(timer_event));
2743 timer_event.type = AS_CLUSTERING_INTERNAL_EVENT_QUANTUM_INTERVAL_START;
2744 timer_event.quantum_interval_is_skippable = is_skippable;
2745 internal_event_dispatch(&timer_event);
2746
2747 // Reset for next interval generation.
2748 quantum_interval_generator_reset(now);
2749 }
2750}
2751
2752/**
2753 * Check if the interval generator has seen an adjacency fault in the current
2754 * quantum interval.
2755 * @return true if the quantum interval generator has seen an adjacency fault,
2756 * false otherwise.
2757 */
2758static bool
2759quantum_interval_is_adjacency_fault_seen()
2760{
2761 CLUSTERING_LOCK();
2762 bool is_fault_seen =
2763 g_quantum_interval_generator.fault[QUANTUM_FAULT_NODE_ARRIVED].event_ts
2764 || g_quantum_interval_generator.fault[QUANTUM_FAULT_NODE_DEPARTED].event_ts
2765 || g_quantum_interval_generator.fault[QUANTUM_FAULT_PRINCIPAL_DEPARTED].event_ts;
2766 CLUSTERING_UNLOCK();
2767 return is_fault_seen;
2768}
2769
2770/**
2771 * Check if the interval generator has seen a peer node adjacency changed fault
2772 * in current quantum interval.
2773 * @return true if the quantum interval generator has seen a peer node adjacency
2774 * changed fault,
2775 * false otherwise.
2776 */
2777static bool
2778quantum_interval_is_peer_adjacency_fault_seen()
2779{
2780 CLUSTERING_LOCK();
2781 bool is_fault_seen =
2782 g_quantum_interval_generator.fault[QUANTUM_FAULT_PEER_ADJACENCY_CHANGED].event_ts;
2783 CLUSTERING_UNLOCK();
2784 return is_fault_seen;
2785}
2786
2787/**
2788 * Update the fault time for this quantum on self heartbeat adjacency list
2789 * change.
2790 */
2791static void
2792quantum_interval_generator_hb_event_handle(
2793 as_clustering_internal_event* hb_event)
2794{
2795 CLUSTERING_LOCK();
2796
2797 cf_clock min_event_time[AS_HB_NODE_EVENT_SENTINEL];
2798 cf_clock min_event_node[AS_HB_NODE_EVENT_SENTINEL];
2799
2800 memset(min_event_time, 0, sizeof(min_event_time));
2801 memset(min_event_node, 0, sizeof(min_event_node));
2802
2803 as_hb_event_node* events = hb_event->hb_events;
2804 for (int i = 0; i < hb_event->hb_n_events; i++) {
2805 if (min_event_time[events[i].evt] == 0
2806 || min_event_time[events[i].evt] > events[i].event_time) {
2807 min_event_time[events[i].evt] = events[i].event_time;
2808 min_event_node[events[i].evt] = events[i].nodeid;
2809 }
2810
2811 if (events[i].evt == AS_HB_NODE_DEPART
2812 && clustering_is_our_principal(events[i].nodeid)) {
2813 quantum_interval_fault_update(QUANTUM_FAULT_PRINCIPAL_DEPARTED,
2814 events[i].event_time, events[i].nodeid);
2815 }
2816 }
2817
2818 for (int i = 0; i < AS_HB_NODE_EVENT_SENTINEL; i++) {
2819 if (min_event_time[i]) {
2820 switch (i) {
2821 case AS_HB_NODE_ARRIVE:
2822 quantum_interval_fault_update(QUANTUM_FAULT_NODE_ARRIVED,
2823 min_event_time[i], min_event_node[i]);
2824 break;
2825 case AS_HB_NODE_DEPART:
2826 quantum_interval_fault_update(QUANTUM_FAULT_NODE_DEPARTED,
2827 min_event_time[i], min_event_node[i]);
2828 break;
2829 case AS_HB_NODE_ADJACENCY_CHANGED:
2830 if (clustering_is_cluster_member(min_event_node[i])) {
2831 quantum_interval_fault_update(
2832 QUANTUM_FAULT_PEER_ADJACENCY_CHANGED,
2833 min_event_time[i], min_event_node[i]);
2834 }
2835 break;
2836 default:
2837 break;
2838 }
2839
2840 }
2841 }
2842 CLUSTERING_UNLOCK();
2843}
2844
2845/**
2846 * Update the fault time for this quantum on clustering information for an
2847 * adjacent node change. Assumes the node's plugin data is not obsolete.
2848 */
2849static void
2850quantum_interval_generator_hb_plugin_data_changed_handle(
2851 as_clustering_internal_event* change_event)
2852{
2853 CLUSTERING_LOCK();
2854
2855 if (clustering_hb_plugin_data_is_obsolete(
2856 g_register.cluster_modified_hlc_ts,
2857 g_register.cluster_modified_time, change_event->plugin_data->data,
2858 change_event->plugin_data->data_size,
2859 change_event->plugin_data_changed_ts,
2860 &change_event->plugin_data_changed_hlc_ts)) {
2861 // The plugin data is obsolete. Can't take decisions based on it.
2862 goto Exit;
2863 }
2864
2865 // Get the changed node's succession list, cluster key. All the fields
2866 // should be present since the obsolete check also checked for fields being
2867 // valid.
2868 cf_node* succession_list_p = clustering_hb_plugin_succession_get(
2869 change_event->plugin_data->data,
2870 change_event->plugin_data->data_size);
2871 uint32_t* succession_list_length_p =
2872 clustering_hb_plugin_succession_length_get(
2873 change_event->plugin_data->data,
2874 change_event->plugin_data->data_size);
2875
2876 if (*succession_list_length_p > 0
2877 && !clustering_is_our_principal(succession_list_p[0])
2878 && clustering_is_principal()) {
2879 if (succession_list_p[0] < config_self_nodeid_get()) {
2880 // We are seeing a new principal who could potentially merge with
2881 // this cluster.
2882 if (g_quantum_interval_generator.fault[QUANTUM_FAULT_INBOUND_MERGE_CANDIDATE_SEEN].event_ts
2883 != 1) {
2884 quantum_interval_fault_update(
2885 QUANTUM_FAULT_INBOUND_MERGE_CANDIDATE_SEEN, cf_getms(),
2886 change_event->plugin_data_changed_nodeid);
2887 }
2888 }
2889 else {
2890 // We see a cluster with higher nodeid and most probably we will not
2891 // be the principal of the merged cluster. Reset the fault
2892 // timestamp, however set it to 1 to differentiate between no fault
2893 // and a fault to be ingnored in this quantum interval. A value of 1
2894 // for practical purposes will never push the quantum interval
2895 // forward.
2896 quantum_interval_fault_update(
2897 QUANTUM_FAULT_INBOUND_MERGE_CANDIDATE_SEEN, 1,
2898 change_event->plugin_data_changed_nodeid);
2899 }
2900 }
2901 else {
2902 if (clustering_is_principal() && *succession_list_length_p == 0
2903 && vector_find(&g_register.succession_list,
2904 &change_event->plugin_data_changed_nodeid) >= 0) {
2905 // One of our cluster members switched to orphan state. Most likely
2906 // a quick restart.
2907 quantum_interval_fault_update(QUANTUM_FAULT_CLUSTER_MEMBER_ORPHANED,
2908 cf_getms(), change_event->plugin_data_changed_nodeid);
2909 }
2910 else {
2911 // A node becoming an orphan node or seeing a succession with our
2912 // principal does not mean we have seen a new cluster.
2913 }
2914 }
2915Exit:
2916 CLUSTERING_UNLOCK();
2917}
2918
2919/**
2920 * Update the fault time for this quantum on self heartbeat adjacency list
2921 * change.
2922 */
2923static void
2924quantum_interval_generator_join_request_accepted_handle(
2925 as_clustering_internal_event* join_request_event)
2926{
2927 quantum_interval_fault_update(QUANTUM_FAULT_JOIN_ACCEPTED, cf_getms(),
2928 join_request_event->join_request_source_nodeid);
2929}
2930
2931/**
2932 * Dispatch internal clustering events for the quantum interval generator.
2933 */
2934static void
2935quantum_interval_generator_event_dispatch(as_clustering_internal_event* event)
2936{
2937 switch (event->type) {
2938 case AS_CLUSTERING_INTERNAL_EVENT_TIMER:
2939 quantum_interval_generator_timer_event_handle(event);
2940 break;
2941 case AS_CLUSTERING_INTERNAL_EVENT_HB:
2942 quantum_interval_generator_hb_event_handle(event);
2943 break;
2944 case AS_CLUSTERING_INTERNAL_EVENT_HB_PLUGIN_DATA_CHANGED:
2945 quantum_interval_generator_hb_plugin_data_changed_handle(event);
2946 break;
2947 case AS_CLUSTERING_INTERNAL_EVENT_JOIN_REQUEST_ACCEPTED:
2948 quantum_interval_generator_join_request_accepted_handle(event);
2949 break;
2950 default:
2951 break;
2952 }
2953}
2954
2955/**
2956 * Start quantum interval generator.
2957 */
2958static void
2959quantum_interval_generator_start()
2960{
2961 CLUSTERING_LOCK();
2962 g_quantum_interval_generator.last_quantum_start_time = cf_getms();
2963 CLUSTERING_UNLOCK();
2964}
2965
2966/*
2967 * ----------------------------------------------------------------------------
2968 * Clustering common
2969 * ----------------------------------------------------------------------------
2970 */
2971
2972/**
2973 * Generate a new random and most likely a unique cluster key.
2974 * @param current_cluster_key current cluster key to prevent collision.
2975 * @return randomly generated cluster key.
2976 */
2977static as_cluster_key
2978clustering_cluster_key_generate(as_cluster_key current_cluster_key)
2979{
2980 // Generate one uuid and use this for the cluster key
2981 as_cluster_key cluster_key = 0;
2982
2983 // Generate a non-zero cluster key that fits in 6 bytes.
2984 while ((cluster_key = (cf_get_rand64() >> 16)) == 0
2985 || cluster_key == current_cluster_key) {
2986 ;
2987 }
2988
2989 return cluster_key;
2990}
2991
2992/**
2993 * Indicates if this node is an orphan. A node is deemed orphan if it is not a
2994 * memeber of any cluster.
2995 */
2996static bool
2997clustering_is_orphan()
2998{
2999 CLUSTERING_LOCK();
3000
3001 bool is_orphan = cf_vector_size(&g_register.succession_list) <= 0
3002 || g_register.cluster_key == 0;
3003
3004 CLUSTERING_UNLOCK();
3005
3006 return is_orphan;
3007}
3008
3009/**
3010 * Return the principal node for current cluster.
3011 * @param principal (output) the current principal for the cluster.
3012 * @return 0 if there is a valid principal, -1 if the node is in orphan state
3013 * and there is no valid principal.
3014 */
3015static int
3016clustering_principal_get(cf_node* principal)
3017{
3018 CLUSTERING_LOCK();
3019 int rv = -1;
3020
3021 if (cf_vector_get(&g_register.succession_list, 0, principal) == 0) {
3022 rv = 0;
3023 }
3024
3025 CLUSTERING_UNLOCK();
3026
3027 return rv;
3028}
3029
3030/**
3031 * Indicates if this node is the principal for its cluster.
3032 */
3033static bool
3034clustering_is_principal()
3035{
3036 CLUSTERING_LOCK();
3037 cf_node current_principal;
3038
3039 bool is_principal = clustering_principal_get(&current_principal) == 0
3040 && current_principal == config_self_nodeid_get();
3041
3042 CLUSTERING_UNLOCK();
3043
3044 return is_principal;
3045}
3046
3047/**
3048 * Indicates if input node is this node's principal. Input node can be self node
3049 * as well.
3050 */
3051static bool
3052clustering_is_our_principal(cf_node nodeid)
3053{
3054 CLUSTERING_LOCK();
3055 cf_node current_principal;
3056
3057 bool is_principal = clustering_principal_get(&current_principal) == 0
3058 && current_principal == nodeid;
3059
3060 CLUSTERING_UNLOCK();
3061
3062 return is_principal;
3063}
3064
3065/**
3066 * Indicates if a node is our cluster member.
3067 */
3068static bool
3069clustering_is_cluster_member(cf_node nodeid)
3070{
3071 CLUSTERING_LOCK();
3072 bool is_member = vector_find(&g_register.succession_list, &nodeid) >= 0;
3073 CLUSTERING_UNLOCK();
3074 return is_member;
3075}
3076
3077/**
3078 * Indicates if the input node is present in a succession list.
3079 * @param nodeid the nodeid to search.
3080 * @param succession_list the succession list.
3081 * @param succession_list_length the length of the succession list.
3082 * @return true if the node is present in the succession list, false otherwise.
3083 */
3084static bool
3085clustering_is_node_in_succession(cf_node nodeid, cf_node* succession_list,
3086 int succession_list_length)
3087{
3088 for (int i = 0; i < succession_list_length; i++) {
3089 if (succession_list[i] == nodeid) {
3090 return true;
3091 }
3092 }
3093
3094 return false;
3095}
3096
3097/**
3098 * Indicates if the input node can be accepted as this a paxos proposer. We can
3099 * accept the new node as our principal if we are in the orphan state or if the
3100 * input node is already our principal.
3101 *
3102 * Note: In case we send a join request to a node with a lower node id, input
3103 * node's nodeid can be less than our nodeid. This is still valid as the
3104 * proposer who will hand over the principalship to us once paxos round is over.
3105 *
3106 * @param nodeid the nodeid of the proposer to check.
3107 * @return true if this input node is an acceptable proposer.
3108 */
3109static bool
3110clustering_can_accept_as_proposer(cf_node nodeid)
3111{
3112 return clustering_is_orphan() || clustering_is_our_principal(nodeid);
3113}
3114
3115/**
3116 * Plugin data iterate function that finds and collects neighboring principals,
3117 * excluding current principal if any .
3118 */
3119static void
3120clustering_neighboring_principals_find(cf_node nodeid, void* plugin_data,
3121 size_t plugin_data_size, cf_clock recv_monotonic_ts,
3122 as_hlc_msg_timestamp* msg_hlc_ts, void* udata)
3123{
3124 cf_vector* neighboring_principals = (cf_vector*)udata;
3125
3126 CLUSTERING_LOCK();
3127
3128 // For determining neighboring principal it is alright if this data is
3129 // within two heartbeat intervals. So obsolete check has the timestamps as
3130 // zero. This way we will not reject principals that have nothing to do with
3131 // our cluster changes.
3132 if (recv_monotonic_ts + 2 * as_hb_tx_interval_get() >= cf_getms()
3133 && !clustering_hb_plugin_data_is_obsolete(0, 0, plugin_data,
3134 plugin_data_size, recv_monotonic_ts, msg_hlc_ts)) {
3135 cf_node* succession_list = clustering_hb_plugin_succession_get(
3136 plugin_data, plugin_data_size);
3137
3138 uint32_t* succession_list_length_p =
3139 clustering_hb_plugin_succession_length_get(plugin_data,
3140 plugin_data_size);
3141
3142 if (succession_list != NULL && succession_list_length_p != NULL
3143 && *succession_list_length_p > 0
3144 && succession_list[0] != config_self_nodeid_get()) {
3145 cf_vector_append_unique(neighboring_principals,
3146 &succession_list[0]);
3147 }
3148 }
3149 else {
3150 DETAIL(
3151 "neighboring principal check skipped - found obsolete plugin data for node %"PRIx64,
3152 nodeid);
3153 }
3154
3155 CLUSTERING_UNLOCK();
3156}
3157
3158/**
3159 * Get a list of adjacent principal nodes ordered by descending nodeids.
3160 */
3161static void
3162clustering_neighboring_principals_get(cf_vector* neighboring_principals)
3163{
3164 CLUSTERING_LOCK();
3165
3166 // Use a single iteration over the clustering data received via the
3167 // heartbeats instead of individual calls to get a consistent view and avoid
3168 // small lock and releases.
3169 as_hb_plugin_data_iterate_all(AS_HB_PLUGIN_CLUSTERING,
3170 clustering_neighboring_principals_find, neighboring_principals);
3171
3172 vector_sort_unique(neighboring_principals, cf_node_compare_desc);
3173
3174 CLUSTERING_UNLOCK();
3175}
3176
3177/**
3178 * Find dead nodes in current succession list.
3179 */
3180static void
3181clustering_dead_nodes_find(cf_vector* dead_nodes)
3182{
3183 CLUSTERING_LOCK();
3184
3185 cf_vector* succession_list_p = &g_register.succession_list;
3186 int succession_list_count = cf_vector_size(succession_list_p);
3187 for (int i = 0; i < succession_list_count; i++) {
3188 // No null check required since we are iterating under a lock and within
3189 // vector bounds.
3190 cf_node cluster_member_nodeid = *((cf_node*)cf_vector_getp(
3191 succession_list_p, i));
3192
3193 if (!as_hb_is_alive(cluster_member_nodeid)) {
3194 cf_vector_append(dead_nodes, &cluster_member_nodeid);
3195 }
3196 }
3197
3198 CLUSTERING_UNLOCK();
3199}
3200
3201/**
3202 * Indicates if a node is faulty. A node in the succecssion list deemed faulty
3203 * - if the node is alive and it reports to be an orphan or is part of some
3204 * other cluster.
3205 * - if the node is alive its clustering protocol identifier does not match this
3206 * node's clustering protocol identifier.
3207 */
3208static bool
3209clustering_node_is_faulty(cf_node nodeid)
3210{
3211 if (nodeid == config_self_nodeid_get()) {
3212 // Self node is never faulty wrt clustering.
3213 return false;
3214 }
3215
3216 CLUSTERING_LOCK();
3217 bool is_faulty = false;
3218 as_hlc_msg_timestamp hb_msg_hlc_ts;
3219 cf_clock msg_recv_ts = 0;
3220 as_hb_plugin_node_data plugin_data = { 0 };
3221
3222 if (clustering_hb_plugin_data_get(nodeid, &plugin_data, &hb_msg_hlc_ts,
3223 &msg_recv_ts) != 0
3224 || clustering_hb_plugin_data_is_obsolete(
3225 g_register.cluster_modified_hlc_ts,
3226 g_register.cluster_modified_time, plugin_data.data,
3227 plugin_data.data_size, msg_recv_ts, &hb_msg_hlc_ts)) {
3228 INFO(
3229 "faulty check skipped - found obsolete plugin data for node %"PRIx64,
3230 nodeid);
3231 is_faulty = false;
3232 goto Exit;
3233 }
3234
3235 // We have clustering data from the node after the current cluster change.
3236 // Compare protocol identifier, clusterkey, and succession.
3237 as_cluster_proto_identifier* proto_p = clustering_hb_plugin_proto_get(
3238 plugin_data.data, plugin_data.data_size);
3239
3240 if (proto_p == NULL
3241 || !clustering_versions_are_compatible(*proto_p,
3242 clustering_protocol_identifier_get())) {
3243 DEBUG("for node %"PRIx64" protocol version mismatch - expected: %"PRIx32" but was : %"PRIx32,
3244 nodeid, clustering_protocol_identifier_get(),
3245 proto_p != NULL ? *proto_p : 0);
3246 is_faulty = true;
3247 goto Exit;
3248 }
3249
3250 as_cluster_key* cluster_key_p = clustering_hb_plugin_cluster_key_get(
3251 plugin_data.data, plugin_data.data_size);
3252 if (cluster_key_p == NULL || *cluster_key_p != g_register.cluster_key) {
3253 DEBUG("for node %"PRIx64" cluster key mismatch - expected: %"PRIx64" but was : %"PRIx64,
3254 nodeid, g_register.cluster_key, cluster_key_p != NULL ? *cluster_key_p : 0);
3255 is_faulty = true;
3256 goto Exit;
3257 }
3258
3259 // Check succession list just to be sure.
3260 // We have clustering data from the node after the current cluster change.
3261 cf_node* succession_list = clustering_hb_plugin_succession_get(
3262 plugin_data.data, plugin_data.data_size);
3263
3264 uint32_t* succession_list_length_p =
3265 clustering_hb_plugin_succession_length_get(plugin_data.data,
3266 plugin_data.data_size);
3267
3268 if (succession_list == NULL || succession_list_length_p == NULL
3269 || !clustering_hb_succession_list_matches(succession_list,
3270 *succession_list_length_p, &g_register.succession_list)) {
3271 INFO("for node %"PRIx64" succession list mismatch", nodeid);
3272
3273 log_cf_node_vector("self succession list:", &g_register.succession_list,
3274 CF_INFO);
3275
3276 if (succession_list) {
3277 log_cf_node_array("node succession list:", succession_list,
3278 succession_list && succession_list_length_p ?
3279 *succession_list_length_p : 0, CF_INFO);
3280 }
3281 else {
3282 INFO("node succession list: (empty)");
3283 }
3284
3285 is_faulty = true;
3286 goto Exit;
3287 }
3288
3289Exit:
3290 CLUSTERING_UNLOCK();
3291 return is_faulty;
3292}
3293
3294/**
3295 * Find "faulty" nodes in current succession list.
3296 */
3297static void
3298clustering_faulty_nodes_find(cf_vector* faulty_nodes)
3299{
3300 CLUSTERING_LOCK();
3301
3302 if (clustering_is_orphan()) {
3303 goto Exit;
3304 }
3305
3306 cf_vector* succession_list_p = &g_register.succession_list;
3307 int succession_list_count = cf_vector_size(succession_list_p);
3308 for (int i = 0; i < succession_list_count; i++) {
3309 // No null check required since we are iterating under a lock and within
3310 // vector bounds.
3311 cf_node cluster_member_nodeid = *((cf_node*)cf_vector_getp(
3312 succession_list_p, i));
3313 if (clustering_node_is_faulty(cluster_member_nodeid)) {
3314 cf_vector_append(faulty_nodes, &cluster_member_nodeid);
3315 }
3316 }
3317
3318Exit:
3319 CLUSTERING_UNLOCK();
3320}
3321
3322/**
3323 * Indicates if a node is in sync with this node's cluster. A node in the
3324 * succecssion list is deemed in sync if the node is alive and it reports to be
3325 * in the same cluster via its heartbeats.
3326 */
3327static bool
3328clustering_node_is_sync(cf_node nodeid)
3329{
3330 if (nodeid == config_self_nodeid_get()) {
3331 // Self node is always in sync wrt clustering.
3332 return true;
3333 }
3334
3335 CLUSTERING_LOCK();
3336 bool is_sync = false;
3337 as_hlc_msg_timestamp hb_msg_hlc_ts;
3338 cf_clock msg_recv_ts = 0;
3339 as_hb_plugin_node_data plugin_data = { 0 };
3340 bool data_exists =
3341 clustering_hb_plugin_data_get(nodeid, &plugin_data, &hb_msg_hlc_ts,
3342 &msg_recv_ts) == 0;
3343
3344 // Latest valid plugin data is ok as long as other checks are met. Hence the
3345 // timestamps are zero.
3346 if (!data_exists || msg_recv_ts + 2 * as_hb_tx_interval_get() < cf_getms()
3347 || clustering_hb_plugin_data_is_obsolete(0, 0, plugin_data.data,
3348 plugin_data.data_size, msg_recv_ts, &hb_msg_hlc_ts)) {
3349 is_sync = false;
3350 goto Exit;
3351 }
3352
3353 // We have clustering data from the node after the current cluster change.
3354 // Compare protocol identifier, clusterkey, and succession.
3355 as_cluster_proto_identifier* proto_p = clustering_hb_plugin_proto_get(
3356 plugin_data.data, plugin_data.data_size);
3357
3358 if (proto_p == NULL
3359 || !clustering_versions_are_compatible(*proto_p,
3360 clustering_protocol_identifier_get())) {
3361 DEBUG(
3362 "for node %"PRIx64" protocol version mismatch - expected: %"PRIx32" but was : %"PRIx32,
3363 nodeid, clustering_protocol_identifier_get(),
3364 proto_p != NULL ? *proto_p : 0);
3365 is_sync = false;
3366 goto Exit;
3367 }
3368
3369 as_cluster_key* cluster_key_p = clustering_hb_plugin_cluster_key_get(
3370 plugin_data.data, plugin_data.data_size);
3371 if (cluster_key_p == NULL || *cluster_key_p != g_register.cluster_key) {
3372 DEBUG(
3373 "for node %"PRIx64" cluster key mismatch - expected: %"PRIx64" but was : %"PRIx64,
3374 nodeid, g_register.cluster_key, cluster_key_p != NULL ? *cluster_key_p : 0);
3375 is_sync = false;
3376 goto Exit;
3377 }
3378
3379 // Check succession list just to be sure.
3380 // We have clustering data from the node after the current cluster change.
3381 cf_node* succession_list = clustering_hb_plugin_succession_get(
3382 plugin_data.data, plugin_data.data_size);
3383
3384 uint32_t* succession_list_length_p =
3385 clustering_hb_plugin_succession_length_get(plugin_data.data,
3386 plugin_data.data_size);
3387
3388 if (succession_list == NULL || succession_list_length_p == NULL
3389 || !clustering_hb_succession_list_matches(succession_list,
3390 *succession_list_length_p, &g_register.succession_list)) {
3391 DEBUG("for node %"PRIx64" succession list mismatch", nodeid);
3392
3393 log_cf_node_vector("self succession list:", &g_register.succession_list,
3394 CF_DEBUG);
3395
3396 if (succession_list) {
3397 log_cf_node_array("node succession list:", succession_list,
3398 succession_list && succession_list_length_p ?
3399 *succession_list_length_p : 0, CF_DEBUG);
3400 }
3401 else {
3402 DEBUG("node succession list: (empty)");
3403 }
3404
3405 is_sync = false;
3406 goto Exit;
3407 }
3408
3409 is_sync = true;
3410
3411Exit:
3412 CLUSTERING_UNLOCK();
3413 return is_sync;
3414}
3415
3416/**
3417 * Find orphan nodes using clustering data for each node in the heartbeat's
3418 * adjacency list.
3419 */
3420static void
3421clustering_orphan_nodes_find(cf_node nodeid, void* plugin_data,
3422 size_t plugin_data_size, cf_clock recv_monotonic_ts,
3423 as_hlc_msg_timestamp* msg_hlc_ts, void* udata)
3424{
3425 cf_vector* orphans = udata;
3426
3427 CLUSTERING_LOCK();
3428
3429 // For determining orphan it is alright if this data is within two heartbeat
3430 // intervals. So obsolete check has the timestamps as zero.
3431 if (recv_monotonic_ts + 2 * as_hb_tx_interval_get() >= cf_getms()
3432 && !clustering_hb_plugin_data_is_obsolete(0, 0, plugin_data,
3433 plugin_data_size, recv_monotonic_ts, msg_hlc_ts)) {
3434 if (clustering_hb_plugin_data_node_status(plugin_data, plugin_data_size)
3435 == AS_NODE_ORPHAN) {
3436 cf_vector_append(orphans, &nodeid);
3437 }
3438
3439 }
3440 else {
3441 DETAIL(
3442 "orphan check skipped - found obsolete plugin data for node %"PRIx64,
3443 nodeid);
3444 }
3445
3446 CLUSTERING_UNLOCK();
3447}
3448
3449/**
3450 * Get a list of neighboring nodes that are orphans. Does not include self node.
3451 */
3452static void
3453clustering_neighboring_orphans_get(cf_vector* neighboring_orphans)
3454{
3455 CLUSTERING_LOCK();
3456
3457 // Use a single iteration over the clustering data received via the
3458 // heartbeats instead of individual calls to get a consistent view and avoid
3459 // small lock and release.
3460 as_hb_plugin_data_iterate_all(AS_HB_PLUGIN_CLUSTERING,
3461 clustering_orphan_nodes_find, neighboring_orphans);
3462
3463 CLUSTERING_UNLOCK();
3464}
3465
3466/**
3467 * Find neighboring nodes using clustering data for each node in the heartbeat's
3468 * adjacency list.
3469 */
3470static void
3471clustering_neighboring_nodes_find(cf_node nodeid, void* plugin_data,
3472 size_t plugin_data_size, cf_clock recv_monotonic_ts,
3473 as_hlc_msg_timestamp* msg_hlc_ts, void* udata)
3474{
3475 cf_vector* nodes = udata;
3476 cf_vector_append(nodes, &nodeid);
3477}
3478
3479/**
3480 * Get a list of all neighboring nodes. Does not include self node.
3481 */
3482static void
3483clustering_neighboring_nodes_get(cf_vector* neighboring_nodes)
3484{
3485 CLUSTERING_LOCK();
3486
3487 // Use a single iteration over the clustering data received via the
3488 // heartbeats instead of individual calls to get a consistent view and avoid
3489 // small lock and release.
3490 as_hb_plugin_data_iterate_all(AS_HB_PLUGIN_CLUSTERING,
3491 clustering_neighboring_nodes_find, neighboring_nodes);
3492
3493 CLUSTERING_UNLOCK();
3494}
3495
3496/**
3497 * Evict nodes not forming a clique from the succession list.
3498 */
3499static uint32_t
3500clustering_succession_list_clique_evict(cf_vector* succession_list,
3501 char* evict_msg)
3502{
3503 uint32_t num_evicted = 0;
3504 if (g_config.clustering_config.clique_based_eviction_enabled) {
3505 // Remove nodes that do not form a clique.
3506 cf_vector* evicted_nodes = vector_stack_lockless_create(cf_node);
3507 as_hb_maximal_clique_evict(succession_list, evicted_nodes);
3508 num_evicted = cf_vector_size(evicted_nodes);
3509 log_cf_node_vector(evict_msg, evicted_nodes,
3510 num_evicted > 0 ? CF_INFO : CF_DEBUG);
3511
3512 vector_subtract(succession_list, evicted_nodes);
3513 cf_vector_destroy(evicted_nodes);
3514 }
3515 return num_evicted;
3516}
3517
3518/*
3519 * ----------------------------------------------------------------------------
3520 * Clustering network message functions
3521 * ----------------------------------------------------------------------------
3522 */
3523
3524/**
3525 * Fill common source node specific fields for the message.
3526 * @param msg the message to fill the source fields into.
3527 */
3528static void
3529msg_src_fields_fill(msg* msg)
3530{
3531 // Set the hb protocol id / version.
3532 msg_set_uint32(msg, AS_CLUSTERING_MSG_ID,
3533 clustering_protocol_identifier_get());
3534
3535 // Set the send timestamp
3536 msg_set_uint64(msg, AS_CLUSTERING_MSG_HLC_TIMESTAMP,
3537 as_hlc_timestamp_now());
3538}
3539
3540/**
3541 * Read the protocol identifier for this clustering message. These functions can
3542 * get called multiple times for a single message. Hence they do not increment
3543 * error counters.
3544 * @param msg the incoming message.
3545 * @param id the output id.
3546 * @return 0 if the type could be parsed -1 on failure.
3547 */
3548static int
3549msg_proto_id_get(msg* msg, uint32_t* id)
3550{
3551 if (msg_get_uint32(msg, AS_CLUSTERING_MSG_ID, id) != 0) {
3552 return -1;
3553 }
3554
3555 return 0;
3556}
3557
3558/**
3559 * Read the message type. These functions can get called multiple times for a
3560 * single message. Hence they do not increment error counters.
3561 * @param msg the incoming message.
3562 * @param type the output message type.
3563 * @return 0 if the type could be parsed -1 on failure.
3564 */
3565static int
3566msg_type_get(msg* msg, as_clustering_msg_type* type)
3567{
3568 if (msg_get_uint32(msg, AS_CLUSTERING_MSG_TYPE, type) != 0) {
3569 return -1;
3570 }
3571
3572 return 0;
3573}
3574
3575/**
3576 * Set the type for an outgoing message.
3577 * @param msg the outgoing message.
3578 * @param msg_type the type to set.
3579 */
3580static void
3581msg_type_set(msg* msg, as_clustering_msg_type msg_type)
3582{
3583 // Set the message type.
3584 msg_set_uint32(msg, AS_CLUSTERING_MSG_TYPE, msg_type);
3585}
3586
3587/**
3588 * Read the proposed principal field from the message.
3589 * @param msg the incoming message.
3590 * @param nodeid the output nodeid.
3591 * @return 0 if the type could be parsed -1 on failure.
3592 */
3593static int
3594msg_proposed_principal_get(msg* msg, cf_node* nodeid)
3595{
3596 if (msg_get_uint64(msg, AS_CLUSTERING_MSG_PROPOSED_PRINCIPAL, nodeid)
3597 != 0) {
3598 return -1;
3599 }
3600
3601 return 0;
3602}
3603
3604/**
3605 * Set the proposed principal field in the message.
3606 * @param msg the outgoing message.
3607 * @param nodeid the proposed principal nodeid.
3608 */
3609static void
3610msg_proposed_principal_set(msg* msg, cf_node nodeid)
3611{
3612 msg_set_uint64(msg, AS_CLUSTERING_MSG_PROPOSED_PRINCIPAL, nodeid);
3613}
3614
3615/**
3616 * Read the HLC send timestamp for the message. These functions can get called
3617 * multiple times for a single message. Hence they do not increment error
3618 * counters.
3619 * @param msg the incoming message.
3620 * @param send_ts the output hls timestamp.
3621 * @return 0 if the type could be parsed -1 on failure.
3622 */
3623static int
3624msg_send_ts_get(msg* msg, as_hlc_timestamp* send_ts)
3625{
3626 if (msg_get_uint64(msg, AS_CLUSTERING_MSG_HLC_TIMESTAMP, send_ts) != 0) {
3627 return -1;
3628 }
3629
3630 return 0;
3631}
3632
3633/**
3634 * Set the sequence number for an outgoing message.
3635 * @param msg the outgoing message.
3636 * @param sequence_number the sequence number to set.
3637 */
3638static void
3639msg_sequence_number_set(msg* msg, as_paxos_sequence_number sequence_number)
3640{
3641 // Set the message type.
3642 msg_set_uint64(msg, AS_CLUSTERING_MSG_SEQUENCE_NUMBER, sequence_number);
3643}
3644
3645/**
3646 * Read sequence number from the message.
3647 * @param msg the incoming message.
3648 * @param sequence_number the output sequence number.
3649 * @return 0 if the sequence number could be parsed -1 on failure.
3650 */
3651static int
3652msg_sequence_number_get(msg* msg, as_paxos_sequence_number* sequence_number)
3653{
3654 if (msg_get_uint64(msg, AS_CLUSTERING_MSG_SEQUENCE_NUMBER, sequence_number)
3655 != 0) {
3656 return -1;
3657 }
3658
3659 return 0;
3660}
3661
3662/**
3663 * Set the cluster key for an outgoing message field.
3664 * @param msg the outgoing message.
3665 * @param cluster_key the cluster key to set.
3666 * @param field the field to set the cluster key to.
3667 */
3668static void
3669msg_cluster_key_field_set(msg* msg, as_cluster_key cluster_key,
3670 as_clustering_msg_field field)
3671{
3672 msg_set_uint64(msg, field, cluster_key);
3673}
3674
3675/**
3676 * Set the cluster key for an outgoing message.
3677 * @param msg the outgoing message.
3678 * @param cluster_key the cluster key to set.
3679 */
3680static void
3681msg_cluster_key_set(msg* msg, as_cluster_key cluster_key)
3682{
3683 msg_cluster_key_field_set(msg, cluster_key, AS_CLUSTERING_MSG_CLUSTER_KEY);
3684}
3685
3686/**
3687 * Read cluster key from a message field.
3688 * @param msg the incoming message.
3689 * @param cluster_key the output cluster key.
3690 * @param field the field to set the cluster key to.
3691 * @return 0 if the cluster key could be parsed -1 on failure.
3692 */
3693static int
3694msg_cluster_key_field_get(msg* msg, as_cluster_key* cluster_key,
3695 as_clustering_msg_field field)
3696{
3697 if (msg_get_uint64(msg, field, cluster_key) != 0) {
3698 return -1;
3699 }
3700
3701 return 0;
3702}
3703
3704/**
3705 * Read cluster key from the message.
3706 * @param msg the incoming message.
3707 * @param cluster_key the output cluster key.
3708 * @return 0 if the cluster key could be parsed -1 on failure.
3709 */
3710static int
3711msg_cluster_key_get(msg* msg, as_cluster_key* cluster_key)
3712{
3713 return msg_cluster_key_field_get(msg, cluster_key,
3714 AS_CLUSTERING_MSG_CLUSTER_KEY);
3715}
3716
3717/**
3718 * Set the succession list for an outgoing message in a particular field.
3719 * @param msg the outgoing message.
3720 * @param succession_list the succession list to set.
3721 * @param field the field to set for the succession list.
3722 */
3723static void
3724msg_succession_list_field_set(msg* msg, cf_vector* succession_list,
3725 as_clustering_msg_field field)
3726
3727{
3728 int num_elements = cf_vector_size(succession_list);
3729 size_t buffer_size = num_elements * sizeof(cf_node);
3730 cf_node* succession_buffer = (cf_node*)BUFFER_ALLOC_OR_DIE(buffer_size);
3731
3732 for (int i = 0; i < num_elements; i++) {
3733 cf_vector_get(succession_list, i, &succession_buffer[i]);
3734 }
3735
3736 msg_set_buf(msg, field, (uint8_t*)succession_buffer, buffer_size,
3737 MSG_SET_COPY);
3738
3739 BUFFER_FREE(succession_buffer, buffer_size);
3740}
3741
3742/**
3743 * Set the succession list for an outgoing message.
3744 * @param msg the outgoing message.
3745 * @param succession_list the succession list to set.
3746 */
3747static void
3748msg_succession_list_set(msg* msg, cf_vector* succession_list)
3749{
3750 int num_elements = cf_vector_size(succession_list);
3751 if (num_elements <= 0) {
3752 // Empty succession list being sent. Definitely wrong.Something is amiss
3753 // let it through. The receiver will reject it anyways.
3754 WARNING("setting empty succession list");
3755 return;
3756 }
3757
3758 msg_succession_list_field_set(msg, succession_list,
3759 AS_CLUSTERING_MSG_SUCCESSION_LIST);
3760}
3761
3762/**
3763 * Read succession list from a message field.
3764 * @param msg the incoming message.
3765 * @param succession_list the output succession list.
3766 * @param field the field to read from.
3767 * @return 0 if the succession list could be parsed -1 on failure.
3768 */
3769static int
3770msg_succession_list_field_get(msg* msg, cf_vector* succession_list,
3771 as_clustering_msg_field field)
3772{
3773 vector_clear(succession_list);
3774 cf_node* succession_buffer;
3775 size_t buffer_size;
3776 if (msg_get_buf(msg, field, (uint8_t**)&succession_buffer, &buffer_size,
3777 MSG_GET_DIRECT) != 0) {
3778 // Empty succession list should not be allowed.
3779 return -1;
3780 }
3781
3782 // Correct adjacency list length.
3783 int num_elements = buffer_size / sizeof(cf_node);
3784
3785 for (int i = 0; i < num_elements; i++) {
3786 cf_vector_append(succession_list, &succession_buffer[i]);
3787 }
3788
3789 vector_sort_unique(succession_list, cf_node_compare_desc);
3790
3791 return 0;
3792}
3793
3794/**
3795 * Read succession list from the message.
3796 * @param msg the incoming message.
3797 * @param succession_list the output succession list.
3798 * @return 0 if the succession list could be parsed -1 on failure.
3799 */
3800static int
3801msg_succession_list_get(msg* msg, cf_vector* succession_list)
3802{
3803 return msg_succession_list_field_get(msg, succession_list,
3804 AS_CLUSTERING_MSG_SUCCESSION_LIST);
3805}
3806
3807/**
3808 * Get the paxos proposal id for message event.
3809 * @param event the message event.
3810 * @param proposal_id the paxos proposal id.
3811 * @return 0 if the type could be parsed -1 on failure.
3812 */
3813static int
3814msg_event_proposal_id_get(as_clustering_internal_event* event,
3815 as_paxos_proposal_id* proposal_id)
3816{
3817 if (msg_sequence_number_get(event->msg, &proposal_id->sequence_number)
3818 != 0) {
3819 return -1;
3820 }
3821 proposal_id->src_nodeid = event->msg_src_nodeid;
3822 return 0;
3823}
3824
3825/**
3826 * Get a network message object from the message pool with all common fields for
3827 * clustering, like protocol identifier, and hlc timestamp filled in.
3828 * @param type the type of the message.
3829 */
3830static msg*
3831msg_pool_get(as_clustering_msg_type type)
3832{
3833 msg* msg = as_fabric_msg_get(M_TYPE_CLUSTERING);
3834 msg_src_fields_fill(msg);
3835 msg_type_set(msg, type);
3836 return msg;
3837}
3838
3839/**
3840 * Return a message back to the message pool.
3841 */
3842static void
3843msg_pool_return(msg* msg)
3844{
3845 as_fabric_msg_put(msg);
3846}
3847
3848/**
3849 * Determines if the received message is old to be ignored.
3850 *
3851 * This is detemined by comparing the message hlc timestamp and monotonic
3852 * timestamps with the cluster formation hlc and monotonic times.
3853 *
3854 * @param cluster_modified_hlc_ts the hlc timestamp when for current cluster
3855 * change happened. Sent to avoid locking in this function.
3856 * @param cluster_modified_time the monotonic timestamp when for current
3857 * cluster change happened. Sento to avoid locking in this function.
3858 * @param msg_recv_ts the monotonic timestamp for plugin data receive.
3859 * @param msg_hlc_ts the hlc timestamp for plugin data receive.
3860 * @return true if plugin data is obsolete, false otherwise.
3861 */
3862bool
3863msg_is_obsolete(as_hlc_timestamp cluster_modified_hlc_ts,
3864 cf_clock cluster_modified_time, cf_clock msg_recv_ts,
3865 as_hlc_msg_timestamp* msg_hlc_ts)
3866{
3867 if (as_hlc_send_timestamp_order(cluster_modified_hlc_ts, msg_hlc_ts)
3868 != AS_HLC_HAPPENS_BEFORE) {
3869 // Cluster formation time after message send or the order is unknown,
3870 // assume cluster formation is after message received.
3871 // The caller should ignore this message.
3872 return true;
3873 }
3874
3875 // MSG should be atleast after cluster formation time + one hb interval to
3876 // send out our cluster state + one network delay for our information to
3877 // reach the remote node + one hb for the other node to send out the his
3878 // updated state +
3879 // one network delay for the updated state to reach us.
3880 if (cluster_modified_time + 2 * as_hb_tx_interval_get()
3881 + 2 * g_config.fabric_latency_max_ms > msg_recv_ts) {
3882 return true;
3883 }
3884
3885 return false;
3886}
3887
3888/**
3889 * Send a message to all input nodes. This is best effort some sends could fail.
3890 * The message will be returned back to the pool.
3891 * @param msg the message to send.
3892 * @param nodes the nodes to send the message to.
3893 * @return 0 on successfu queueing of message (does not imply guaranteed
3894 * delivery), -1 if the message could not be queued.
3895 */
3896static int
3897msg_node_send(msg* msg, cf_node node)
3898{
3899 int rv = as_fabric_send(node, msg, AS_FABRIC_CHANNEL_CTRL);
3900 if (rv) {
3901 // Fabric did not clean up the message, return it back to the message
3902 // pool.
3903 msg_pool_return(msg);
3904 }
3905 return rv;
3906}
3907
3908/**
3909 * Send a message to all input nodes. This is best effort some sends could fail.
3910 * The message will be returned back to the pool.
3911 * @param msg the message to send.
3912 * @param nodes the nodes to send the message to.
3913 * @return the number of nodes the message was sent to. Does not imply
3914 * guaranteed receipt by these nodes however.
3915 */
3916static int
3917msg_nodes_send(msg* msg, cf_vector* nodes)
3918{
3919 int node_count = cf_vector_size(nodes);
3920 int sent_count = 0;
3921
3922 if (node_count <= 0) {
3923 return sent_count;
3924 }
3925
3926 int alloc_size = node_count * sizeof(cf_node);
3927 cf_node* send_list = (cf_node*)BUFFER_ALLOC_OR_DIE(alloc_size);
3928
3929 vector_array_cpy(send_list, nodes, node_count);
3930
3931 if (as_fabric_send_list(send_list, node_count, msg, AS_FABRIC_CHANNEL_CTRL)
3932 != 0) {
3933 // Fabric did not clean up the message, return it back to the message
3934 // pool.
3935 msg_pool_return(msg);
3936 }
3937
3938 BUFFER_FREE(send_list, alloc_size);
3939 return sent_count;
3940}
3941
3942/*
3943 * ----------------------------------------------------------------------------
3944 * Paxos common
3945 * ----------------------------------------------------------------------------
3946 */
3947
3948/**
3949 * Compare paxos proposal ids. Compares the sequence numbers, ties in sequence
3950 * number are broken by nodeids.
3951 *
3952 * @param id1 the first identifier.
3953 * @param id2 the second identifier.
3954 *
3955 * @return 0 if id1 equals id2, 1 if id1 > id2 and -1 if id1 < id2.
3956 */
3957static int
3958paxos_proposal_id_compare(as_paxos_proposal_id* id1, as_paxos_proposal_id* id2)
3959{
3960 if (id1->sequence_number != id2->sequence_number) {
3961 return id1->sequence_number > id2->sequence_number ? 1 : -1;
3962 }
3963
3964 // Sequence numbers match, compare nodeids.
3965 if (id1->src_nodeid != id2->src_nodeid) {
3966 return id1->src_nodeid > id2->src_nodeid ? 1 : -1;
3967 }
3968
3969 // Node id and sequence numbers match.
3970 return 0;
3971}
3972
3973/*
3974 * ----------------------------------------------------------------------------
3975 * Paxos proposer
3976 * ----------------------------------------------------------------------------
3977 */
3978
3979/**
3980 * Dump paxos proposer state to logs.
3981 */
3982static void
3983paxos_proposer_dump(bool verbose)
3984{
3985 CLUSTERING_LOCK();
3986
3987 // Output paxos proposer state.
3988 switch (g_proposer.state) {
3989 case AS_PAXOS_PROPOSER_STATE_IDLE:
3990 INFO("CL: paxos proposer: idle");
3991 break;
3992 case AS_PAXOS_PROPOSER_STATE_PREPARE_SENT:
3993 INFO("CL: paxos proposer: prepare sent");
3994 break;
3995 case AS_PAXOS_PROPOSER_STATE_ACCEPT_SENT:
3996 INFO("CL: paxos proposer: accept sent");
3997 break;
3998 }
3999
4000 if (verbose) {
4001 if (g_proposer.state != AS_PAXOS_PROPOSER_STATE_IDLE) {
4002 INFO("CL: paxos proposal start time: %"PRIu64" now: %"PRIu64,
4003 g_proposer.paxos_round_start_time, cf_getms());
4004 INFO("CL: paxos proposed cluster key: %"PRIx64,
4005 g_proposer.proposed_value.cluster_key);
4006 INFO("CL: paxos proposed sequence: %"PRIu64,
4007 g_proposer.sequence_number);
4008 log_cf_node_vector("CL: paxos proposed succession:",
4009 &g_proposer.proposed_value.succession_list, CF_INFO);
4010 log_cf_node_vector("CL: paxos promises received:",
4011 &g_proposer.promises_received, CF_INFO);
4012 log_cf_node_vector("CL: paxos accepted received:",
4013 &g_proposer.accepted_received, CF_INFO);
4014 }
4015 }
4016
4017 CLUSTERING_UNLOCK();
4018}
4019
4020/**
4021 * Reset state on failure of a paxos round.
4022 */
4023static void
4024paxos_proposer_reset()
4025{
4026 CLUSTERING_LOCK();
4027
4028 // Flipping state to idle to indicate paxos round is over.
4029 g_proposer.state = AS_PAXOS_PROPOSER_STATE_IDLE;
4030 memset(&g_proposer.sequence_number, 0, sizeof(g_proposer.sequence_number));
4031
4032 g_proposer.proposed_value.cluster_key = 0;
4033 vector_clear(&g_proposer.proposed_value.succession_list);
4034
4035 vector_clear(&g_proposer.acceptors);
4036
4037 DETAIL("paxos round over for proposal id %"PRIx64":%"PRIu64,
4038 config_self_nodeid_get(), g_proposer.sequence_number);
4039
4040 CLUSTERING_UNLOCK();
4041}
4042
4043/**
4044 * Invoked to fail an ongoing paxos proposal.
4045 */
4046static void
4047paxos_proposer_fail()
4048{
4049 // Cleanup state for the paxos round.
4050 paxos_proposer_reset();
4051
4052 as_clustering_internal_event paxos_fail_event;
4053 memset(&paxos_fail_event, 0, sizeof(paxos_fail_event));
4054 paxos_fail_event.type = AS_CLUSTERING_INTERNAL_EVENT_PAXOS_PROPOSER_FAIL;
4055
4056 internal_event_dispatch(&paxos_fail_event);
4057}
4058
4059/**
4060 * Indicates if a paxos proposal from self node is active.
4061 */
4062static bool
4063paxos_proposer_proposal_is_active()
4064{
4065 CLUSTERING_LOCK();
4066 bool rv = g_proposer.state != AS_PAXOS_PROPOSER_STATE_IDLE;
4067 CLUSTERING_UNLOCK();
4068 return rv;
4069}
4070
4071/**
4072 * Send paxos prepare message current list of acceptor nodes.
4073 */
4074static void
4075paxos_proposer_prepare_send()
4076{
4077 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_PAXOS_PREPARE);
4078
4079 CLUSTERING_LOCK();
4080
4081 // Set the sequence number
4082 msg_sequence_number_set(msg, g_proposer.sequence_number);
4083
4084 log_cf_node_vector("paxos prepare message sent to:", &g_proposer.acceptors,
4085 CF_DEBUG);
4086
4087 g_proposer.prepare_send_time = cf_getms();
4088
4089 cf_vector* acceptors = vector_stack_lockless_create(cf_node);
4090 vector_copy(acceptors, &g_proposer.acceptors);
4091
4092 CLUSTERING_UNLOCK();
4093
4094 // Sent the message to the acceptors.
4095 msg_nodes_send(msg, acceptors);
4096 cf_vector_destroy(acceptors);
4097}
4098
4099/**
4100 * Send paxos accept message current list of acceptor nodes.
4101 */
4102static void
4103paxos_proposer_accept_send()
4104{
4105 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_PAXOS_ACCEPT);
4106
4107 CLUSTERING_LOCK();
4108
4109 // Set the sequence number
4110 msg_sequence_number_set(msg, g_proposer.sequence_number);
4111
4112 // Skip send of the proposed value for accept, since we do not use it. Learn
4113 // message is the only way a consensus value is sent out.
4114 log_cf_node_vector("paxos accept message sent to:", &g_proposer.acceptors,
4115 CF_DEBUG);
4116
4117 g_proposer.accept_send_time = cf_getms();
4118
4119 cf_vector* acceptors = vector_stack_lockless_create(cf_node);
4120 vector_copy(acceptors, &g_proposer.acceptors);
4121
4122 CLUSTERING_UNLOCK();
4123
4124 // Sent the message to the acceptors.
4125 msg_nodes_send(msg, acceptors);
4126 cf_vector_destroy(acceptors);
4127}
4128
4129/**
4130 * Send paxos learn message current list of acceptor nodes.
4131 */
4132static void
4133paxos_proposer_learn_send()
4134{
4135 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_PAXOS_LEARN);
4136
4137 CLUSTERING_LOCK();
4138
4139 // Set the sequence number
4140 msg_sequence_number_set(msg, g_proposer.sequence_number);
4141
4142 // Set the cluster key
4143 msg_cluster_key_set(msg, g_proposer.proposed_value.cluster_key);
4144
4145 // Set the succession list
4146 msg_succession_list_set(msg, &g_proposer.proposed_value.succession_list);
4147
4148 log_cf_node_vector("paxos learn message sent to:", &g_proposer.acceptors,
4149 CF_DEBUG);
4150
4151 g_proposer.learn_send_time = cf_getms();
4152
4153 cf_vector* acceptors = vector_stack_lockless_create(cf_node);
4154 vector_copy(acceptors, &g_proposer.acceptors);
4155
4156 CLUSTERING_UNLOCK();
4157
4158 // Sent the message to the acceptors.
4159 msg_nodes_send(msg, acceptors);
4160 cf_vector_destroy(acceptors);
4161}
4162
4163/**
4164 * Handle an incoming paxos promise message.
4165 */
4166static void
4167paxos_proposer_promise_handle(as_clustering_internal_event* event)
4168{
4169 cf_node src_nodeid = event->msg_src_nodeid;
4170 msg* msg = event->msg;
4171
4172 DEBUG("received paxos promise from node %"PRIx64, src_nodeid);
4173
4174 CLUSTERING_LOCK();
4175 if (g_proposer.state != AS_PAXOS_PROPOSER_STATE_PREPARE_SENT) {
4176 // We are not in the prepare phase. Reject this message.
4177 DEBUG("ignoring paxos promise from node %"PRIx64" - we are not in prepare phase",
4178 src_nodeid);
4179 goto Exit;
4180 }
4181
4182 if (vector_find(&g_proposer.acceptors, &src_nodeid) < 0) {
4183 WARNING("ignoring paxos promise from node %"PRIx64" - it is not in acceptor list",
4184 src_nodeid);
4185 goto Exit;
4186 }
4187
4188 as_paxos_sequence_number sequence_number = 0;
4189 if (msg_sequence_number_get(msg, &sequence_number) != 0) {
4190 WARNING("ignoring paxos promise from node %"PRIx64" with invalid proposal id",
4191 src_nodeid);
4192 goto Exit;
4193 }
4194
4195 if (sequence_number != g_proposer.sequence_number) {
4196 // Not a matching promise message. Ignore.
4197 INFO("ignoring paxos promise from node %"PRIx64" because its proposal id %"PRIu64" does not match expected id %"PRIu64,
4198 src_nodeid, sequence_number,
4199 g_proposer.sequence_number);
4200 goto Exit;
4201 }
4202
4203 cf_vector_append_unique(&g_proposer.promises_received, &src_nodeid);
4204
4205 int promised_count = cf_vector_size(&g_proposer.promises_received);
4206 int acceptor_count = cf_vector_size(&g_proposer.acceptors);
4207
4208 // Use majority quorum to move on.
4209 if (promised_count >= 1 + (acceptor_count / 2)) {
4210 // We have quorum number of promises. go ahead to the accept phase.
4211 g_proposer.state = AS_PAXOS_PROPOSER_STATE_ACCEPT_SENT;
4212 paxos_proposer_accept_send();
4213 }
4214
4215Exit:
4216 CLUSTERING_UNLOCK();
4217}
4218
4219/**
4220 * Handle an incoming paxos prepare nack message.
4221 */
4222static void
4223paxos_proposer_prepare_nack_handle(as_clustering_internal_event* event)
4224{
4225 cf_node src_nodeid = event->msg_src_nodeid;
4226 msg* msg = event->msg;
4227
4228 DEBUG("received paxos prepare nack from node %"PRIx64, src_nodeid);
4229
4230 CLUSTERING_LOCK();
4231 if (g_proposer.state != AS_PAXOS_PROPOSER_STATE_PREPARE_SENT) {
4232 // We are not in the prepare phase. Reject this message.
4233 INFO("ignoring paxos prepare nack from node %"PRIx64" - we are not in prepare phase",
4234 src_nodeid);
4235 goto Exit;
4236 }
4237
4238 if (vector_find(&g_proposer.acceptors, &src_nodeid) < 0) {
4239 WARNING("ignoring paxos prepare nack from node %"PRIx64" - it is not in acceptor list",
4240 src_nodeid);
4241 goto Exit;
4242 }
4243
4244 as_paxos_sequence_number sequence_number = 0;
4245 if (msg_sequence_number_get(msg, &sequence_number) != 0) {
4246 WARNING("ignoring paxos prepare nack from node %"PRIx64" with invalid proposal id",
4247 src_nodeid);
4248 goto Exit;
4249 }
4250
4251 if (sequence_number != g_proposer.sequence_number) {
4252 // Not a matching prepare nack message. Ignore.
4253 INFO("ignoring paxos prepare nack from node %"PRIx64" because its proposal id %"PRIu64" does not match expected id %"PRIu64,
4254 src_nodeid, sequence_number,
4255 g_proposer.sequence_number);
4256 goto Exit;
4257 }
4258
4259 INFO(
4260 "aborting current paxos proposal because of a prepare nack from node %"PRIx64,
4261 src_nodeid);
4262 paxos_proposer_fail();
4263
4264Exit:
4265 CLUSTERING_UNLOCK();
4266}
4267
4268/**
4269 * Invoked when all acceptors have accepted the proposal.
4270 */
4271static void
4272paxos_proposer_success()
4273{
4274 CLUSTERING_LOCK();
4275
4276 // Set the proposer to back idle state.
4277 g_proposer.state = AS_PAXOS_PROPOSER_STATE_IDLE;
4278
4279 // Send out learn message and enable retransmits of learn message.
4280 g_proposer.learn_retransmit_needed = true;
4281 paxos_proposer_learn_send();
4282
4283 // Retain the sequence_number, cluster key and succession list for
4284 // retransmits of the learn message.
4285 as_clustering_internal_event paxos_success_event;
4286 memset(&paxos_success_event, 0, sizeof(paxos_success_event));
4287 paxos_success_event.type =
4288 AS_CLUSTERING_INTERNAL_EVENT_PAXOS_PROPOSER_SUCCESS;
4289
4290 CLUSTERING_UNLOCK();
4291}
4292
4293/**
4294 * Indicates if the proposer can accept, accepted messages.
4295 */
4296static bool
4297paxos_proposer_can_accept_accepted(cf_node src_nodeid, msg* msg)
4298{
4299 bool rv = false;
4300
4301 CLUSTERING_LOCK();
4302 // We also allow accepted messages in the idle state to deal with a loss of
4303 // the learn message.
4304 if (g_proposer.state != AS_PAXOS_PROPOSER_STATE_ACCEPT_SENT
4305 && g_proposer.state != AS_PAXOS_PROPOSER_STATE_IDLE) {
4306 // We are not in the accept phase. Reject this message.
4307 DEBUG("ignoring paxos accepted from node %"PRIx64" - we are not in accept phase. Actual phase %d",
4308 src_nodeid, g_proposer.state);
4309 goto Exit;
4310 }
4311
4312 if (vector_find(&g_proposer.acceptors, &src_nodeid) < 0) {
4313 WARNING("ignoring paxos accepted from node %"PRIx64" - it is not in acceptor list",
4314 src_nodeid);
4315 goto Exit;
4316 }
4317
4318 as_paxos_sequence_number sequence_number = 0;
4319 if (msg_sequence_number_get(msg, &sequence_number) != 0) {
4320 WARNING("ignoring paxos accepted from node %"PRIx64" with invalid proposal id",
4321 src_nodeid);
4322 goto Exit;
4323 }
4324
4325 if (sequence_number != g_proposer.sequence_number) {
4326 // Not a matching accepted message. Ignore.
4327 INFO("ignoring paxos accepted from node %"PRIx64" because its proposal id %"PRIu64" does not match expected id %"PRIu64,
4328 src_nodeid, sequence_number,
4329 g_proposer.sequence_number);
4330 goto Exit;
4331 }
4332
4333 if (g_proposer.proposed_value.cluster_key == g_register.cluster_key
4334 && vector_equals(&g_proposer.proposed_value.succession_list,
4335 &g_register.succession_list)) {
4336 // The register is already synced for this proposal. We can ignore this
4337 // accepted message.
4338 INFO("ignoring paxos accepted from node %"PRIx64" because its proposal id %"PRIu64" is a duplicate",
4339 src_nodeid, sequence_number
4340 );
4341 goto Exit;
4342 }
4343
4344 rv = true;
4345Exit:
4346 CLUSTERING_UNLOCK();
4347 return rv;
4348}
4349
4350/**
4351 * Handle an incoming paxos accepted message.
4352 */
4353static void
4354paxos_proposer_accepted_handle(as_clustering_internal_event* event)
4355{
4356 cf_node src_nodeid = event->msg_src_nodeid;
4357 msg* msg = event->msg;
4358
4359 DEBUG("received paxos accepted from node %"PRIx64, src_nodeid);
4360
4361 if (!paxos_proposer_can_accept_accepted(src_nodeid, msg)) {
4362 return;
4363 }
4364
4365 CLUSTERING_LOCK();
4366
4367 cf_vector_append_unique(&g_proposer.accepted_received, &src_nodeid);
4368
4369 int accepted_count = cf_vector_size(&g_proposer.accepted_received);
4370 int acceptor_count = cf_vector_size(&g_proposer.acceptors);
4371
4372 // Use a simple quorum, all acceptors should accept for success.
4373 if (accepted_count == acceptor_count) {
4374 // This is the point after which the succession list will not change for
4375 // this paxos round. Ensure that we meet the minimum cluster size
4376 // criterion.
4377 int cluster_size = cf_vector_size(
4378 &g_proposer.proposed_value.succession_list);
4379 if (cluster_size < g_config.clustering_config.cluster_size_min) {
4380 WARNING(
4381 "failing paxos round - the remaining number of nodes %d is less than minimum cluster size %d",
4382 cluster_size, g_config.clustering_config.cluster_size_min);
4383 // Fail paxos.
4384 paxos_proposer_fail();
4385 goto Exit;
4386 }
4387
4388 // We have quorum number of accepted nodes. The proposal succeeded.
4389 paxos_proposer_success();
4390 }
4391
4392Exit:
4393 CLUSTERING_UNLOCK();
4394}
4395
4396/**
4397 * Handle an incoming paxos accept nack message.
4398 */
4399static void
4400paxos_proposer_accept_nack_handle(as_clustering_internal_event* event)
4401{
4402 cf_node src_nodeid = event->msg_src_nodeid;
4403 msg* msg = event->msg;
4404
4405 DEBUG("received paxos accept nack from node %"PRIx64, src_nodeid);
4406
4407 CLUSTERING_LOCK();
4408 if (g_proposer.state != AS_PAXOS_PROPOSER_STATE_ACCEPT_SENT) {
4409 // We are not in the accept phase. Reject this message.
4410 INFO("ignoring paxos accept nack from node %"PRIx64" - we are not in accept phase",
4411 src_nodeid);
4412 goto Exit;
4413 }
4414
4415 if (vector_find(&g_proposer.acceptors, &src_nodeid) < 0) {
4416 WARNING("ignoring paxos accept nack from node %"PRIx64" - it is not in acceptor list",
4417 src_nodeid);
4418 goto Exit;
4419 }
4420
4421 as_paxos_sequence_number sequence_number = 0;
4422 if (msg_sequence_number_get(msg, &sequence_number) != 0) {
4423 WARNING("ignoring paxos accept nack from node %"PRIx64" with invalid proposal id",
4424 src_nodeid);
4425 goto Exit;
4426 }
4427
4428 if (sequence_number != g_proposer.sequence_number) {
4429 // Not a matching accept nack message. Ignore.
4430 INFO("ignoring paxos accept nack from node %"PRIx64"because its proposal id %"PRIu64" does not match expected id %"PRIu64,
4431 src_nodeid, sequence_number,
4432 g_proposer.sequence_number);
4433 goto Exit;
4434 }
4435
4436 INFO(
4437 "aborting current paxos proposal because of an accept nack from node %"PRIx64,
4438 src_nodeid);
4439 paxos_proposer_fail();
4440
4441Exit:
4442 CLUSTERING_UNLOCK();
4443}
4444
4445/**
4446 * Handle an incoming message.
4447 */
4448static void
4449paxos_proposer_msg_event_handle(as_clustering_internal_event* msg_event)
4450{
4451 switch (msg_event->msg_type) {
4452 case AS_CLUSTERING_MSG_TYPE_PAXOS_PROMISE:
4453 paxos_proposer_promise_handle(msg_event);
4454 break;
4455 case AS_CLUSTERING_MSG_TYPE_PAXOS_PREPARE_NACK:
4456 paxos_proposer_prepare_nack_handle(msg_event);
4457 break;
4458 case AS_CLUSTERING_MSG_TYPE_PAXOS_ACCEPTED:
4459 paxos_proposer_accepted_handle(msg_event);
4460 break;
4461 case AS_CLUSTERING_MSG_TYPE_PAXOS_ACCEPT_NACK:
4462 paxos_proposer_accept_nack_handle(msg_event);
4463 break;
4464 default: // Other message types are not of interest.
4465 break;
4466 }
4467}
4468
4469/**
4470 * Handle heartbeat event.
4471 */
4472static void
4473paxos_proposer_hb_event_handle(as_clustering_internal_event* hb_event)
4474{
4475 if (!paxos_proposer_proposal_is_active()) {
4476 return;
4477 }
4478
4479 CLUSTERING_LOCK();
4480 for (int i = 0; i < hb_event->hb_n_events; i++) {
4481 if (hb_event->hb_events[i].evt == AS_HB_NODE_DEPART) {
4482 cf_node departed_node = hb_event->hb_events[i].nodeid;
4483 if (vector_find(&g_proposer.acceptors, &departed_node)) {
4484 // One of the acceptors has departed. Abort the paxos proposal.
4485 INFO("paxos acceptor %"PRIx64" departed - aborting current paxos proposal", departed_node);
4486 paxos_proposer_fail();
4487 break;
4488 }
4489 }
4490 }
4491 CLUSTERING_UNLOCK();
4492}
4493
4494/**
4495 * Check and retransmit prepare message if paxos promise messages have not yet
4496 * being received.
4497 */
4498static void
4499paxos_proposer_prepare_check_retransmit()
4500{
4501 CLUSTERING_LOCK();
4502 cf_clock now = cf_getms();
4503 if (g_proposer.state == AS_PAXOS_PROPOSER_STATE_PREPARE_SENT
4504 && g_proposer.prepare_send_time + paxos_msg_timeout() < now) {
4505 paxos_proposer_prepare_send();
4506 }
4507 CLUSTERING_UNLOCK();
4508}
4509
4510/**
4511 * Check and retransmit accept message if paxos accepted has yet being received.
4512 */
4513static void
4514paxos_proposer_accept_check_retransmit()
4515{
4516 CLUSTERING_LOCK();
4517 cf_clock now = cf_getms();
4518 if (g_proposer.state == AS_PAXOS_PROPOSER_STATE_ACCEPT_SENT
4519 && g_proposer.accept_send_time + paxos_msg_timeout() < now) {
4520 paxos_proposer_accept_send();
4521 }
4522 CLUSTERING_UNLOCK();
4523}
4524
4525/**
4526 * Check and retransmit learn message if all acceptors have not applied the
4527 * current cluster change.
4528 */
4529static void
4530paxos_proposer_learn_check_retransmit()
4531{
4532 CLUSTERING_LOCK();
4533 cf_clock now = cf_getms();
4534 bool learn_timedout = g_proposer.learn_retransmit_needed
4535 && (g_proposer.state == AS_PAXOS_PROPOSER_STATE_IDLE)
4536 && (g_proposer.proposed_value.cluster_key != 0)
4537 && (g_proposer.learn_send_time + paxos_msg_timeout() < now);
4538
4539 if (learn_timedout) {
4540 // If the register is not synced, most likely the learn message did not
4541 // make it through, retransmit the learn message to move the paxos
4542 // acceptor forward and start register sync.
4543 INFO("retransmitting paxos learn message");
4544 paxos_proposer_learn_send();
4545 }
4546 CLUSTERING_UNLOCK();
4547}
4548
4549/**
4550 * Handle a timer event and retransmit messages if required.
4551 */
4552static void
4553paxos_proposer_timer_event_handle()
4554{
4555 CLUSTERING_LOCK();
4556 switch (g_proposer.state) {
4557 case AS_PAXOS_PROPOSER_STATE_IDLE:
4558 paxos_proposer_learn_check_retransmit();
4559 break;
4560 case AS_PAXOS_PROPOSER_STATE_PREPARE_SENT:
4561 paxos_proposer_prepare_check_retransmit();
4562 break;
4563 case AS_PAXOS_PROPOSER_STATE_ACCEPT_SENT:
4564 paxos_proposer_accept_check_retransmit();
4565 break;
4566 }
4567 CLUSTERING_UNLOCK();
4568}
4569
4570/**
4571 * Handle register getting synched.
4572 */
4573static void
4574paxos_proposer_register_synched()
4575{
4576 CLUSTERING_LOCK();
4577 // Register synched we no longer need learn messages to be retransmitted.
4578 g_proposer.learn_retransmit_needed = false;
4579 CLUSTERING_UNLOCK();
4580}
4581
4582/**
4583 * Initialize paxos proposer state.
4584 */
4585static void
4586paxos_proposer_init()
4587{
4588 CLUSTERING_LOCK();
4589 // Memset to zero which ensures that all proposer state variables have zero
4590 // which is the correct initial value for elements other that contained
4591 // vectors and status.
4592 memset(&g_proposer, 0, sizeof(g_proposer));
4593
4594 // Initialize the proposer state.
4595 // No paxos round running, so the state has to be idle.
4596 g_proposer.state = AS_PAXOS_PROPOSER_STATE_IDLE;
4597
4598 // Set the current acceptor list to be empty.
4599 vector_lockless_init(&g_proposer.acceptors, cf_node);
4600
4601 // Set the current promises received node list to empty.
4602 vector_lockless_init(&g_proposer.promises_received, cf_node);
4603
4604 // Set the current accepted received node list to empty.
4605 vector_lockless_init(&g_proposer.accepted_received, cf_node);
4606
4607 // Initialize the proposed value.
4608 vector_lockless_init(&g_proposer.proposed_value.succession_list, cf_node);
4609 g_proposer.proposed_value.cluster_key = 0;
4610
4611 CLUSTERING_UNLOCK();
4612}
4613
4614/**
4615 * Log paxos results.
4616 */
4617static void
4618paxos_result_log(as_paxos_start_result result, cf_vector* new_succession_list)
4619{
4620 CLUSTERING_LOCK();
4621 switch (result) {
4622 case AS_PAXOS_RESULT_STARTED: {
4623 // Running check required because paxos round finished for single node
4624 // cluster by this time.
4625 if (paxos_proposer_proposal_is_active()) {
4626 INFO("paxos round started - cluster key: %"PRIx64,
4627 g_proposer.proposed_value.cluster_key);
4628 log_cf_node_vector("paxos round started - succession list:",
4629 &g_proposer.proposed_value.succession_list, CF_INFO);
4630 }
4631 break;
4632 }
4633
4634 case AS_PAXOS_RESULT_CLUSTER_TOO_SMALL: {
4635 WARNING(
4636 "paxos round aborted - new cluster size %d less than min cluster size %d",
4637 cf_vector_size(new_succession_list),
4638 g_config.clustering_config.cluster_size_min);
4639 break;
4640 }
4641
4642 case AS_PAXOS_RESULT_ROUND_RUNNING: {
4643 // Should never happen in practice. Let the old round finish or timeout.
4644 WARNING(
4645 "older paxos round still running - should have finished by now");
4646 }
4647 }
4648
4649 CLUSTERING_UNLOCK();
4650}
4651
4652/**
4653 * Start a new paxos round.
4654 *
4655 * @param new_succession_list the new succession list.
4656 * @param acceptor_list the list of nodes to use for paxos acceptors.
4657 * @param current_cluster_key the current cluster key
4658 * @param current_succession_list the current succession list, can be null if
4659 * this node is an orphan.
4660 */
4661static as_paxos_start_result
4662paxos_proposer_proposal_start(cf_vector* new_succession_list,
4663 cf_vector* acceptor_list)
4664{
4665 if (cf_vector_size(new_succession_list)
4666 < g_config.clustering_config.cluster_size_min) {
4667 // Fail paxos.
4668 return AS_PAXOS_RESULT_CLUSTER_TOO_SMALL;
4669 }
4670
4671 CLUSTERING_LOCK();
4672
4673 as_paxos_start_result result;
4674 if (paxos_proposer_proposal_is_active()) {
4675 result = AS_PAXOS_RESULT_ROUND_RUNNING;
4676 goto Exit;
4677 }
4678
4679 // Update state to prepare.
4680 g_proposer.state = AS_PAXOS_PROPOSER_STATE_PREPARE_SENT;
4681
4682 g_proposer.sequence_number = as_hlc_timestamp_now();
4683
4684 g_proposer.paxos_round_start_time = cf_getms();
4685
4686 // Populate the proposed value struct with new succession list and a new
4687 // cluster key.
4688 vector_clear(&g_proposer.proposed_value.succession_list);
4689 vector_copy(&g_proposer.proposed_value.succession_list,
4690 new_succession_list);
4691 g_proposer.proposed_value.cluster_key = clustering_cluster_key_generate(
4692 g_register.cluster_key);
4693
4694 // Remember the acceptors for this paxos round.
4695 vector_clear(&g_proposer.acceptors);
4696 vector_copy(&g_proposer.acceptors, acceptor_list);
4697
4698 // Clear the promise received and accepted received vectors for this new
4699 // round.
4700 vector_clear(&g_proposer.promises_received);
4701 vector_clear(&g_proposer.accepted_received);
4702
4703 paxos_proposer_prepare_send();
4704
4705 result = AS_PAXOS_RESULT_STARTED;
4706
4707Exit:
4708 CLUSTERING_UNLOCK();
4709
4710 return result;
4711}
4712
4713/**
4714 * Paxos proposer monitor to detect and cleanup long running and most likely
4715 * failed paxos rounds.
4716 */
4717static void
4718paxos_proposer_monitor()
4719{
4720 CLUSTERING_LOCK();
4721 if (paxos_proposer_proposal_is_active()) {
4722 if (g_proposer.paxos_round_start_time + paxos_proposal_timeout()
4723 <= cf_getms()) {
4724 // Paxos round is running and has timed out.
4725 // Consider paxos round failed.
4726 INFO("paxos round timed out for proposal id %"PRIx64":%"PRIu64,
4727 config_self_nodeid_get(),
4728 g_proposer.sequence_number);
4729 paxos_proposer_fail();
4730 }
4731 }
4732 CLUSTERING_UNLOCK();
4733}
4734
4735/*
4736 * ----------------------------------------------------------------------------
4737 * Paxos acceptor
4738 * ----------------------------------------------------------------------------
4739 */
4740
4741/**
4742 * Dump paxos acceptor state to logs.
4743 */
4744static void
4745paxos_acceptor_dump(bool verbose)
4746{
4747 CLUSTERING_LOCK();
4748
4749 // Output paxos acceptor state.
4750 switch (g_acceptor.state) {
4751 case AS_PAXOS_ACCEPTOR_STATE_IDLE:
4752 INFO("CL: paxos acceptor: idle");
4753 break;
4754 case AS_PAXOS_ACCEPTOR_STATE_PROMISED:
4755 INFO("CL: paxos acceptor: promised");
4756 break;
4757 case AS_PAXOS_ACCEPTOR_STATE_ACCEPTED:
4758 INFO("CL: paxos acceptor: accepted");
4759 break;
4760 }
4761
4762 if (verbose) {
4763 if (g_acceptor.state != AS_PAXOS_ACCEPTOR_STATE_IDLE) {
4764 INFO("CL: paxos acceptor start time: %"PRIu64" now: %"PRIu64,
4765 g_acceptor.acceptor_round_start, cf_getms());
4766 INFO("CL: paxos acceptor proposal id: (%"PRIx64":%"PRIu64")",
4767 g_acceptor.last_proposal_received_id.src_nodeid,
4768 g_acceptor.last_proposal_received_id.sequence_number);
4769 INFO("CL: paxos acceptor promised time: %"PRIu64" now: %"PRIu64,
4770 g_acceptor.promise_send_time, cf_getms());
4771 INFO("CL: paxos acceptor accepted time: %"PRIu64" now: %"PRIu64,
4772 g_acceptor.accepted_send_time, cf_getms());
4773 }
4774 }
4775
4776 CLUSTERING_UNLOCK();
4777}
4778
4779/**
4780 * Reset the acceptor for the next round.
4781 */
4782static void
4783paxos_acceptor_reset()
4784{
4785 CLUSTERING_LOCK();
4786 g_acceptor.state = AS_PAXOS_ACCEPTOR_STATE_IDLE;
4787 g_acceptor.acceptor_round_start = 0;
4788 g_acceptor.promise_send_time = 0;
4789 g_acceptor.accepted_send_time = 0;
4790 CLUSTERING_UNLOCK();
4791}
4792
4793/**
4794 * Invoked to fail an ongoing paxos proposal.
4795 */
4796static void
4797paxos_acceptor_fail()
4798{
4799 // Cleanup state for the paxos round.
4800 paxos_acceptor_reset();
4801
4802 as_clustering_internal_event paxos_fail_event;
4803 memset(&paxos_fail_event, 0, sizeof(paxos_fail_event));
4804 paxos_fail_event.type = AS_CLUSTERING_INTERNAL_EVENT_PAXOS_ACCEPTOR_FAIL;
4805
4806 internal_event_dispatch(&paxos_fail_event);
4807}
4808
4809/**
4810 * Invoked on success of an ongoing paxos proposal.
4811 */
4812static void
4813paxos_acceptor_success(as_cluster_key cluster_key, cf_vector* succession_list,
4814 as_paxos_sequence_number sequence_number)
4815{
4816 // Cleanup state for the paxos round.
4817 paxos_acceptor_reset();
4818
4819 as_clustering_internal_event paxos_success_event;
4820 memset(&paxos_success_event, 0, sizeof(paxos_success_event));
4821 paxos_success_event.type =
4822 AS_CLUSTERING_INTERNAL_EVENT_PAXOS_ACCEPTOR_SUCCESS;
4823 paxos_success_event.new_succession_list = succession_list;
4824 paxos_success_event.new_cluster_key = cluster_key;
4825 paxos_success_event.new_sequence_number = sequence_number;
4826
4827 internal_event_dispatch(&paxos_success_event);
4828}
4829
4830/**
4831 * Send paxos promise message to the proposer node.
4832 * @param dest the destination node.
4833 * @param sequence_number the sequence number from the incoming message.
4834 */
4835static void
4836paxos_acceptor_promise_send(cf_node dest,
4837 as_paxos_sequence_number sequence_number)
4838{
4839 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_PAXOS_PROMISE);
4840
4841 msg_sequence_number_set(msg, sequence_number);
4842
4843 DEBUG("paxos promise message sent to node %"PRIx64" with proposal id (%"PRIx64":%"PRIu64")", dest, dest, sequence_number);
4844
4845 CLUSTERING_LOCK();
4846 g_acceptor.promise_send_time = cf_getms();
4847 CLUSTERING_UNLOCK();
4848
4849 // Send the message to the proposer.
4850 msg_node_send(msg, dest);
4851}
4852
4853/**
4854 * Send paxos prepare nack message to the proposer.
4855 * @param dest the destination node.
4856 * @param sequence_number the sequence number from the incoming message.
4857 */
4858static void
4859paxos_acceptor_prepare_nack_send(cf_node dest,
4860 as_paxos_sequence_number sequence_number)
4861{
4862 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_PAXOS_PREPARE_NACK);
4863
4864 msg_sequence_number_set(msg, sequence_number);
4865
4866 DEBUG("paxos prepare nack message sent to node %"PRIx64" with proposal id (%"PRIx64":%"PRIu64")", dest, dest, sequence_number);
4867
4868 // Send the message to the proposer.
4869 msg_node_send(msg, dest);
4870}
4871
4872/**
4873 * Send paxos accepted message to the proposer node.
4874 * @param dest the destination node.
4875 * @param sequence_number the sequence number from the incoming message.
4876 */
4877static void
4878paxos_acceptor_accepted_send(cf_node dest,
4879 as_paxos_sequence_number sequence_number)
4880{
4881 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_PAXOS_ACCEPTED);
4882
4883 msg_sequence_number_set(msg, sequence_number);
4884
4885 DEBUG("paxos accepted message sent to node %"PRIx64" with proposal id (%"PRIx64":%"PRIu64")", dest, dest, sequence_number);
4886
4887 CLUSTERING_LOCK();
4888 g_acceptor.accepted_send_time = cf_getms();
4889 CLUSTERING_UNLOCK();
4890
4891 // Send the message to the proposer.
4892 msg_node_send(msg, dest);
4893}
4894
4895/**
4896 * Send paxos accept nack message to the proposer.
4897 * @param dest the destination node.
4898 * @param sequence_number the sequence number from the incoming message.
4899 */
4900static void
4901paxos_acceptor_accept_nack_send(cf_node dest,
4902 as_paxos_sequence_number sequence_number)
4903{
4904 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_PAXOS_ACCEPT_NACK);
4905
4906 msg_sequence_number_set(msg, sequence_number);
4907
4908 DEBUG("paxos accept nack message sent to node %"PRIx64" with proposal id (%"PRIx64":%"PRIu64")", dest, dest, sequence_number);
4909
4910 // Send the message to the proposer.
4911 msg_node_send(msg, dest);
4912}
4913
4914/**
4915 * Check if the incoming prepare can be promised.
4916 */
4917static bool
4918paxos_acceptor_prepare_can_promise(cf_node src_nodeid,
4919 as_paxos_proposal_id* proposal_id)
4920{
4921 if (!clustering_can_accept_as_proposer(src_nodeid)) {
4922 INFO("ignoring paxos prepare from node %"PRIx64" because it cannot be a principal",
4923 src_nodeid);
4924 return false;
4925 }
4926
4927 bool can_promise = false;
4928 CLUSTERING_LOCK();
4929 int comparison = paxos_proposal_id_compare(proposal_id,
4930 &g_acceptor.last_proposal_received_id);
4931
4932 switch (g_acceptor.state) {
4933 case AS_PAXOS_ACCEPTOR_STATE_IDLE:
4934 case AS_PAXOS_ACCEPTOR_STATE_ACCEPTED: {
4935 // Allow only higher valued proposal to prevent replays and also to
4936 // ensure convergence in the face of competing proposals.
4937 can_promise = comparison > 0;
4938 }
4939 break;
4940 case AS_PAXOS_ACCEPTOR_STATE_PROMISED: {
4941 // We allow for replays of the prepare message as well so that the
4942 // proposer can receive a promise for this node's lost promise message.
4943 can_promise = comparison >= 0;
4944 }
4945 break;
4946 }
4947
4948 CLUSTERING_UNLOCK();
4949
4950 return can_promise;
4951}
4952
4953/**
4954 * Handle an incoming paxos prepare message.
4955 */
4956static void
4957paxos_acceptor_prepare_handle(as_clustering_internal_event* event)
4958{
4959 cf_node src_nodeid = event->msg_src_nodeid;
4960 DEBUG("received paxos prepare from node %"PRIx64, src_nodeid);
4961
4962 as_paxos_proposal_id proposal_id = { 0 };
4963 if (msg_event_proposal_id_get(event, &proposal_id) != 0) {
4964 INFO("ignoring paxos prepare from node %"PRIx64" with invalid proposal id",
4965 src_nodeid);
4966 return;
4967 }
4968
4969 if (!paxos_acceptor_prepare_can_promise(src_nodeid, &proposal_id)) {
4970 INFO("ignoring paxos prepare from node %"PRIx64" with obsolete proposal id (%"PRIx64":%"PRIu64")", proposal_id.src_nodeid, proposal_id.src_nodeid, proposal_id.sequence_number);
4971 paxos_acceptor_prepare_nack_send(src_nodeid,
4972 proposal_id.sequence_number);
4973 return;
4974 }
4975
4976 CLUSTERING_LOCK();
4977
4978 bool is_new_proposal = paxos_proposal_id_compare(&proposal_id,
4979 &g_acceptor.last_proposal_received_id) != 0;
4980
4981 if (is_new_proposal) {
4982 // Remember this to be the last proposal id we received.
4983 memcpy(&g_acceptor.last_proposal_received_id, &proposal_id,
4984 sizeof(proposal_id));
4985
4986 // Update the round start time.
4987 g_acceptor.acceptor_round_start = cf_getms();
4988
4989 // Switch to promised state.
4990 g_acceptor.state = AS_PAXOS_ACCEPTOR_STATE_PROMISED;
4991 }
4992 else {
4993 // This is a retransmit or delayed message in which case we do not
4994 // update the state.
4995 // If we have already accepted this proposal, we would want to remain in
4996 // accepted state.
4997 }
4998
4999 // The proposal is promised. Send back a paxos promise.
5000 paxos_acceptor_promise_send(src_nodeid, proposal_id.sequence_number);
5001
5002 CLUSTERING_UNLOCK();
5003}
5004
5005/**
5006 * Check if the incoming accept can be accepted.
5007 */
5008static bool
5009paxos_acceptor_accept_can_accept(cf_node src_nodeid,
5010 as_paxos_proposal_id* proposal_id)
5011{
5012 if (!clustering_can_accept_as_proposer(src_nodeid)) {
5013 INFO("ignoring paxos accept from node %"PRIx64" because it cannot be a principal",
5014 src_nodeid);
5015 return false;
5016 }
5017
5018 bool can_accept = false;
5019 CLUSTERING_LOCK();
5020 int comparison = paxos_proposal_id_compare(proposal_id,
5021 &g_acceptor.last_proposal_received_id);
5022
5023 switch (g_acceptor.state) {
5024 case AS_PAXOS_ACCEPTOR_STATE_IDLE:
5025 case AS_PAXOS_ACCEPTOR_STATE_PROMISED:
5026 case AS_PAXOS_ACCEPTOR_STATE_ACCEPTED: {
5027 // We allow for replays of the accept message as well, so that the
5028 // proposer can receive an accepted for this node's lost accepted
5029 // message.
5030 can_accept = comparison >= 0;
5031 }
5032 break;
5033 }
5034
5035 CLUSTERING_UNLOCK();
5036
5037 return can_accept;
5038}
5039
5040/**
5041 * Handle an incoming paxos accept message.
5042 */
5043static void
5044paxos_acceptor_accept_handle(as_clustering_internal_event* event)
5045{
5046 cf_node src_nodeid = event->msg_src_nodeid;
5047
5048 DEBUG("received paxos accept from node %"PRIx64, src_nodeid);
5049
5050 // Its ok to proceed even is paxos is running, because this could be a
5051 // competing proposal and the winner will be decided by paxos sequence
5052 // number.
5053 as_paxos_proposal_id proposal_id = { 0 };
5054 if (msg_event_proposal_id_get(event, &proposal_id) != 0) {
5055 INFO("ignoring paxos accept from node %"PRIx64" with invalid proposal id",
5056 src_nodeid);
5057 return;
5058 }
5059
5060 if (!paxos_acceptor_accept_can_accept(src_nodeid, &proposal_id)) {
5061 INFO("ignoring paxos accept from node %"PRIx64" with obsolete proposal id (%"PRIx64":%"PRIu64")", proposal_id.src_nodeid, proposal_id.src_nodeid, proposal_id.sequence_number);
5062 paxos_acceptor_accept_nack_send(src_nodeid,
5063 proposal_id.sequence_number);
5064 return;
5065 }
5066
5067 CLUSTERING_LOCK();
5068
5069 bool is_new_proposal = paxos_proposal_id_compare(&proposal_id,
5070 &g_acceptor.last_proposal_received_id) != 0;
5071
5072 if (is_new_proposal) {
5073 // This node has missed the prepare message, but received the accept
5074 // message. This is alright.
5075
5076 // Remember this to be the last proposal id we received.
5077 memcpy(&g_acceptor.last_proposal_received_id, &proposal_id,
5078 sizeof(proposal_id));
5079
5080 // Mark this as the start of the acceptor paxos round.
5081 g_acceptor.acceptor_round_start = cf_getms();
5082 }
5083
5084 g_acceptor.state = AS_PAXOS_ACCEPTOR_STATE_ACCEPTED;
5085 // The proposal is accepted. Send back a paxos accept.
5086 paxos_acceptor_accepted_send(src_nodeid, proposal_id.sequence_number);
5087
5088 CLUSTERING_UNLOCK();
5089}
5090
5091/**
5092 * Handle an incoming paxos learn message.
5093 */
5094static void
5095paxos_acceptor_learn_handle(as_clustering_internal_event* event)
5096{
5097 cf_node src_nodeid = event->msg_src_nodeid;
5098 msg* msg = event->msg;
5099
5100 DEBUG("received paxos learn from node %"PRIx64, src_nodeid);
5101
5102 if (!clustering_can_accept_as_proposer(src_nodeid)) {
5103 INFO("ignoring learn message from a non-principal node %"PRIx64" because we are already in a cluster",
5104 src_nodeid);
5105 return;
5106 }
5107
5108 // Its ok to proceed even if paxos is running, because this could be a
5109 // competing proposal and the winner was decided by paxos sequence number.
5110 as_paxos_proposal_id proposal_id = { 0 };
5111 if (msg_event_proposal_id_get(event, &proposal_id) != 0) {
5112 INFO("ignoring paxos learn from node %"PRIx64"with invalid proposal id",
5113 src_nodeid);
5114 return;
5115 }
5116
5117 CLUSTERING_LOCK();
5118
5119 if (g_acceptor.state != AS_PAXOS_ACCEPTOR_STATE_ACCEPTED) {
5120 INFO(
5121 "ignoring paxos learn from node %"PRIx64" - proposal id (%"PRIx64":%"PRIu64") we are already in a cluster",
5122 src_nodeid, proposal_id.src_nodeid,
5123 proposal_id.sequence_number);
5124 goto Exit;
5125 }
5126
5127 if (paxos_proposal_id_compare(&proposal_id,
5128 &g_acceptor.last_proposal_received_id) != 0) {
5129 // We have not promised nor accepted this proposal,
5130 // ignore the learn message.
5131 INFO(
5132 "ignoring paxos learn from node %"PRIx64" - proposal id (%"PRIx64":%"PRIu64") mismatches current proposal id (%"PRIx64":%"PRIu64")",
5133 src_nodeid, proposal_id.src_nodeid,
5134 proposal_id.sequence_number,
5135 g_acceptor.last_proposal_received_id.src_nodeid,
5136 g_acceptor.last_proposal_received_id.sequence_number);
5137 goto Exit;
5138 }
5139
5140 as_cluster_key new_cluster_key = 0;
5141 cf_vector* new_succession_list = vector_stack_lockless_create(cf_node);
5142
5143 if (msg_cluster_key_get(msg, &new_cluster_key) != 0) {
5144 INFO("ignoring paxos learn from node %"PRIx64" without cluster key",
5145 src_nodeid);
5146 goto Exit_destory_succession;
5147 }
5148
5149 if (msg_succession_list_get(msg, new_succession_list) != 0) {
5150 INFO("ignoring paxos learn from node %"PRIx64" without succession list",
5151 src_nodeid);
5152 goto Exit_destory_succession;
5153 }
5154
5155 if (new_cluster_key == g_register.cluster_key) {
5156 if (!vector_equals(new_succession_list, &g_register.succession_list)) {
5157 // We have the same cluster key repeated for a new round. Should
5158 // never happen.
5159 CRASH("duplicate cluster key %"PRIx64" generated for different paxos rounds - disastrous", new_cluster_key);
5160 }
5161
5162 INFO("ignoring duplicate paxos learn from node %"PRIx64, src_nodeid);
5163 goto Exit_destory_succession;
5164 }
5165
5166 // Paxos round converged, apply the new cluster configuration.
5167 paxos_acceptor_success(new_cluster_key, new_succession_list,
5168 proposal_id.sequence_number);
5169
5170Exit_destory_succession:
5171 cf_vector_destroy(new_succession_list);
5172
5173Exit:
5174 CLUSTERING_UNLOCK();
5175}
5176
5177/**
5178 * Handle an incoming message.
5179 */
5180static void
5181paxos_acceptor_msg_event_handle(as_clustering_internal_event *msg_event)
5182{
5183 switch (msg_event->msg_type) {
5184 case AS_CLUSTERING_MSG_TYPE_PAXOS_PREPARE:
5185 paxos_acceptor_prepare_handle(msg_event);
5186 break;
5187 case AS_CLUSTERING_MSG_TYPE_PAXOS_ACCEPT:
5188 paxos_acceptor_accept_handle(msg_event);
5189 break;
5190 case AS_CLUSTERING_MSG_TYPE_PAXOS_LEARN:
5191 paxos_acceptor_learn_handle(msg_event);
5192 break;
5193 default: // Other message types are not of interest.
5194 break;
5195 }
5196}
5197
5198/**
5199 * Check and retransmit promise message if paxos proposer has not moved ahead
5200 * and send back an accept message.
5201 */
5202static void
5203paxos_acceptor_promise_check_retransmit()
5204{
5205 CLUSTERING_LOCK();
5206 cf_clock now = cf_getms();
5207 if (g_acceptor.state == AS_PAXOS_ACCEPTOR_STATE_PROMISED
5208 && g_acceptor.promise_send_time + paxos_msg_timeout() < now) {
5209 paxos_acceptor_promise_send(
5210 g_acceptor.last_proposal_received_id.src_nodeid,
5211 g_acceptor.last_proposal_received_id.sequence_number);
5212 }
5213 CLUSTERING_UNLOCK();
5214}
5215
5216/**
5217 * Check and retransmit accepted message if paxos proposer has not send back a
5218 * learn message.
5219 */
5220static void
5221paxos_acceptor_accepted_check_retransmit()
5222{
5223 CLUSTERING_LOCK();
5224 cf_clock now = cf_getms();
5225 if (g_acceptor.state == AS_PAXOS_ACCEPTOR_STATE_ACCEPTED
5226 && g_acceptor.accepted_send_time + paxos_msg_timeout() < now) {
5227 paxos_acceptor_accepted_send(
5228 g_acceptor.last_proposal_received_id.src_nodeid,
5229 g_acceptor.last_proposal_received_id.sequence_number);
5230 }
5231 CLUSTERING_UNLOCK();
5232}
5233
5234/**
5235 * Handle a timer event and retransmit messages if required.
5236 */
5237static void
5238paxos_acceptor_timer_event_handle()
5239{
5240 CLUSTERING_LOCK();
5241 switch (g_acceptor.state) {
5242 case AS_PAXOS_ACCEPTOR_STATE_IDLE: {
5243 // No retransmitts required.
5244 break;
5245 }
5246 case AS_PAXOS_ACCEPTOR_STATE_PROMISED:
5247 paxos_acceptor_promise_check_retransmit();
5248 break;
5249 case AS_PAXOS_ACCEPTOR_STATE_ACCEPTED:
5250 paxos_acceptor_accepted_check_retransmit();
5251 break;
5252 }
5253
5254 CLUSTERING_UNLOCK();
5255}
5256
5257/**
5258 * Initialize paxos acceptor state.
5259 */
5260static void
5261paxos_acceptor_init()
5262{
5263 CLUSTERING_LOCK();
5264 // Memset to zero which ensures that all acceptor state variables have zero
5265 // which is the correct initial value for elements other that contained
5266 // vectors and status.
5267 memset(&g_acceptor, 0, sizeof(g_acceptor));
5268 g_acceptor.state = AS_PAXOS_ACCEPTOR_STATE_IDLE;
5269 CLUSTERING_UNLOCK();
5270}
5271
5272/**
5273 * Paxos acceptor monitor to detect and cleanup long running and most likely
5274 * failed paxos rounds.
5275 */
5276static void
5277paxos_acceptor_monitor()
5278{
5279 CLUSTERING_LOCK();
5280 if (g_acceptor.state != AS_PAXOS_ACCEPTOR_STATE_IDLE
5281 && g_acceptor.acceptor_round_start + paxos_proposal_timeout()
5282 <= cf_getms()) {
5283 // Paxos round is running and has timed out.
5284 // Consider paxos round failed.
5285 INFO("paxos round timed out for proposal id %"PRIx64":%"PRIu64,
5286 config_self_nodeid_get(),
5287 g_proposer.sequence_number);
5288 paxos_acceptor_fail();
5289 }
5290 CLUSTERING_UNLOCK();
5291}
5292
5293/*
5294 * ----------------------------------------------------------------------------
5295 * Paxos lifecycle and common event handling
5296 * ----------------------------------------------------------------------------
5297 */
5298
5299/**
5300 * Paxos monitor to detect and cleanup long running and most likely failed paxos
5301 * rounds.
5302 */
5303static void
5304paxos_monitor()
5305{
5306 paxos_proposer_monitor();
5307 paxos_acceptor_monitor();
5308}
5309
5310/**
5311 * Handle an incoming timer event.
5312 */
5313static void
5314paxos_timer_event_handle()
5315{
5316 // Acceptor retransmits handled here.
5317 paxos_acceptor_timer_event_handle();
5318
5319 // Proposer retransmits handled here.
5320 paxos_proposer_timer_event_handle();
5321
5322 // Invoke Paxos monitor to timeout long running paxos rounds.
5323 paxos_monitor();
5324}
5325
5326/**
5327 * Handle incoming messages.
5328 */
5329static void
5330paxos_msg_event_handle(as_clustering_internal_event* msg_event)
5331{
5332 paxos_acceptor_msg_event_handle(msg_event);
5333 paxos_proposer_msg_event_handle(msg_event);
5334}
5335
5336/**
5337 * Handle heartbeat event.
5338 */
5339static void
5340paxos_hb_event_handle(as_clustering_internal_event* hb_event)
5341{
5342 paxos_proposer_hb_event_handle(hb_event);
5343}
5344
5345/**
5346 * Dispatch clustering events.
5347 */
5348static void
5349paxos_event_dispatch(as_clustering_internal_event* event)
5350{
5351 switch (event->type) {
5352 case AS_CLUSTERING_INTERNAL_EVENT_TIMER:
5353 paxos_timer_event_handle();
5354 break;
5355 case AS_CLUSTERING_INTERNAL_EVENT_MSG:
5356 paxos_msg_event_handle(event);
5357 break;
5358 case AS_CLUSTERING_INTERNAL_EVENT_HB:
5359 paxos_hb_event_handle(event);
5360 break;
5361 case AS_CLUSTERING_INTERNAL_EVENT_REGISTER_CLUSTER_SYNCED:
5362 paxos_proposer_register_synched();
5363 default: // Not of interest for paxos.
5364 break;
5365 }
5366}
5367
5368/**
5369 * Initialize paxos proposer and acceptor data structures.
5370 */
5371static void
5372paxos_init()
5373{
5374 paxos_proposer_init();
5375 paxos_acceptor_init();
5376}
5377
5378/*
5379 * ----------------------------------------------------------------------------
5380 * Clustering external event publisher
5381 * ----------------------------------------------------------------------------
5382 */
5383
5384/**
5385 * * Check if event publisher is running.
5386 */
5387static bool
5388external_event_publisher_is_running()
5389{
5390 CLUSTERING_EVENT_PUBLISHER_LOCK();
5391 bool running = g_external_event_publisher.sys_state
5392 == AS_CLUSTERING_SYS_STATE_RUNNING;
5393 CLUSTERING_EVENT_PUBLISHER_UNLOCK();
5394 return running;
5395}
5396
5397/**
5398 * Initialize the event publisher.
5399 */
5400static void
5401external_event_publisher_init()
5402{
5403 CLUSTERING_EVENT_PUBLISHER_LOCK();
5404 memset(&g_external_event_publisher, 0, sizeof(g_external_event_publisher));
5405 vector_lockless_init(&g_external_event_publisher.published_succession_list,
5406 cf_node);
5407
5408 pthread_mutex_init(&g_external_event_publisher.is_pending_mutex, NULL);
5409 pthread_cond_init(&g_external_event_publisher.is_pending, NULL);
5410 CLUSTERING_EVENT_PUBLISHER_UNLOCK();
5411}
5412
5413/**
5414 * Wakeup the publisher thread.
5415 */
5416static void
5417external_event_publisher_thr_wakeup()
5418{
5419 pthread_mutex_lock(&g_external_event_publisher.is_pending_mutex);
5420 pthread_cond_signal(&g_external_event_publisher.is_pending);
5421 pthread_mutex_unlock(&g_external_event_publisher.is_pending_mutex);
5422}
5423
5424/**
5425 * Queue up and external event to publish.
5426 */
5427static void
5428external_event_queue(as_clustering_event* event)
5429{
5430 CLUSTERING_EVENT_PUBLISHER_LOCK();
5431 memcpy(&g_external_event_publisher.to_publish, event,
5432 sizeof(g_external_event_publisher.to_publish));
5433
5434 vector_clear(&g_external_event_publisher.published_succession_list);
5435 if (event->succession_list) {
5436 // Use the static list for the published event, so that the input event
5437 // object can be destroyed irrespective of when the it is published.
5438 vector_copy(&g_external_event_publisher.published_succession_list,
5439 event->succession_list);
5440 g_external_event_publisher.to_publish.succession_list =
5441 &g_external_event_publisher.published_succession_list;
5442
5443 }
5444
5445 g_external_event_publisher.event_queued = true;
5446
5447 CLUSTERING_EVENT_PUBLISHER_UNLOCK();
5448
5449 // Wake up the publisher thread.
5450 external_event_publisher_thr_wakeup();
5451}
5452
5453/**
5454 * Publish external events if any are pending.
5455 */
5456static void
5457external_events_publish()
5458{
5459 CLUSTERING_EVENT_PUBLISHER_LOCK();
5460
5461 if (g_external_event_publisher.event_queued) {
5462 g_external_event_publisher.event_queued = false;
5463 exchange_clustering_event_listener(
5464 &g_external_event_publisher.to_publish);
5465 }
5466 CLUSTERING_EVENT_PUBLISHER_UNLOCK();
5467}
5468
5469/**
5470 * External event publisher thread.
5471 */
5472static void*
5473external_event_publisher_thr(void* arg)
5474{
5475 pthread_mutex_lock(&g_external_event_publisher.is_pending_mutex);
5476
5477 while (true) {
5478 pthread_cond_wait(&g_external_event_publisher.is_pending,
5479 &g_external_event_publisher.is_pending_mutex);
5480 if (external_event_publisher_is_running()) {
5481 external_events_publish();
5482 }
5483 else {
5484 // Publisher stopped, exit the tread.
5485 break;
5486 }
5487 }
5488
5489 pthread_mutex_unlock(&g_external_event_publisher.is_pending_mutex);
5490 return NULL;
5491}
5492
5493/**
5494 * Start the event publisher.
5495 */
5496static void
5497external_event_publisher_start()
5498{
5499 CLUSTERING_EVENT_PUBLISHER_LOCK();
5500 g_external_event_publisher.sys_state = AS_CLUSTERING_SYS_STATE_RUNNING;
5501 g_external_event_publisher.event_publisher_tid =
5502 cf_thread_create_joinable(external_event_publisher_thr, NULL);
5503 CLUSTERING_EVENT_PUBLISHER_UNLOCK();
5504}
5505
5506/**
5507 * Stop the event publisher.
5508 */
5509static void
5510external_event_publisher_stop()
5511{
5512 CLUSTERING_EVENT_PUBLISHER_LOCK();
5513 g_external_event_publisher.sys_state =
5514 AS_CLUSTERING_SYS_STATE_SHUTTING_DOWN;
5515 CLUSTERING_EVENT_PUBLISHER_UNLOCK();
5516
5517 external_event_publisher_thr_wakeup();
5518 cf_thread_join(g_external_event_publisher.event_publisher_tid);
5519
5520 CLUSTERING_EVENT_PUBLISHER_LOCK();
5521 g_external_event_publisher.sys_state = AS_CLUSTERING_SYS_STATE_STOPPED;
5522 g_external_event_publisher.event_queued = false;
5523 CLUSTERING_EVENT_PUBLISHER_UNLOCK();
5524}
5525
5526/*
5527 * ----------------------------------------------------------------------------
5528 * Clustering register
5529 * ----------------------------------------------------------------------------
5530 */
5531
5532/**
5533 * Dump register state to logs.
5534 */
5535static void
5536register_dump(bool verbose)
5537{
5538 CLUSTERING_LOCK();
5539
5540 // Output register state.
5541 switch (g_register.state) {
5542 case AS_CLUSTERING_REGISTER_STATE_SYNCED:
5543 INFO("CL: register: synced");
5544 break;
5545 case AS_CLUSTERING_REGISTER_STATE_SYNCING:
5546 INFO("CL: register: syncing");
5547 break;
5548 }
5549
5550 // Cluster state details.
5551 INFO("CL: cluster changed at: %"PRIu64" now: %"PRIu64,
5552 g_register.cluster_modified_time, cf_getms());
5553
5554 INFO("CL: cluster key: %"PRIx64, g_register.cluster_key);
5555 INFO("CL: cluster sequence: %"PRIu64, g_register.sequence_number);
5556 INFO("CL: cluster size: %d", cf_vector_size(&g_register.succession_list));
5557
5558 if (verbose) {
5559 log_cf_node_vector("CL: succession:", &g_register.succession_list,
5560 CF_INFO);
5561 }
5562
5563 CLUSTERING_UNLOCK();
5564}
5565
5566/**
5567 * Initialize the register.
5568 */
5569static void
5570register_init()
5571{
5572 CLUSTERING_LOCK();
5573 memset(&g_register, 0, sizeof(g_register));
5574 vector_lockless_init(&g_register.succession_list, cf_node);
5575 vector_lockless_init(&g_register.sync_pending, cf_node);
5576 vector_lockless_init(&g_register.ooo_change_applied_received, cf_node);
5577 vector_lockless_init(&g_register.ooo_succession_list, cf_node);
5578
5579 // We are in the orphan state but that will be considered as sync state.
5580 g_register.state = AS_CLUSTERING_REGISTER_STATE_SYNCED;
5581 CLUSTERING_UNLOCK();
5582}
5583
5584/**
5585 * Returns true if register sync is pending.
5586 */
5587static bool
5588register_is_sycn_pending()
5589{
5590 CLUSTERING_LOCK();
5591 bool sync_pending = cf_vector_size(&g_register.sync_pending) > 0;
5592 log_cf_node_vector("pending register sync:", &g_register.sync_pending,
5593 CF_DETAIL);
5594 CLUSTERING_UNLOCK();
5595 return sync_pending;
5596}
5597
5598/**
5599 * Check if the register is synced across the cluster and move to sync state if
5600 * it is synced.
5601 */
5602static void
5603register_check_and_switch_synced()
5604{
5605 CLUSTERING_LOCK();
5606 if (!register_is_sycn_pending()
5607 && g_register.state != AS_CLUSTERING_REGISTER_STATE_SYNCED) {
5608 g_register.state = AS_CLUSTERING_REGISTER_STATE_SYNCED;
5609 // Generate internal cluster changed synced.
5610 as_clustering_internal_event cluster_synced;
5611 memset(&cluster_synced, 0, sizeof(cluster_synced));
5612 cluster_synced.type =
5613 AS_CLUSTERING_INTERNAL_EVENT_REGISTER_CLUSTER_SYNCED;
5614 internal_event_dispatch(&cluster_synced);
5615 }
5616 CLUSTERING_UNLOCK();
5617}
5618
5619/**
5620 * Update register to become an orphan node.
5621 */
5622static void
5623register_become_orphan(as_clustering_event_qualifier qualifier)
5624{
5625 CLUSTERING_LOCK();
5626 g_register.state = AS_CLUSTERING_REGISTER_STATE_SYNCED;
5627 g_register.cluster_key = 0;
5628 g_register.sequence_number = 0;
5629 g_register.has_orphan_transitioned = true;
5630 g_clustering.has_integrity = false;
5631 vector_clear(&g_register.succession_list);
5632 vector_clear(&g_register.sync_pending);
5633
5634 g_register.cluster_modified_time = cf_getms();
5635 g_register.cluster_modified_hlc_ts = as_hlc_timestamp_now();
5636
5637 // Queue internal orphaned event.
5638 as_clustering_internal_event orphaned_event;
5639 memset(&orphaned_event, 0, sizeof(orphaned_event));
5640 orphaned_event.type = AS_CLUSTERING_INTERNAL_EVENT_REGISTER_ORPHANED;
5641 orphaned_event.qualifier = qualifier;
5642 internal_event_dispatch(&orphaned_event);
5643
5644 CLUSTERING_UNLOCK();
5645
5646 INFO("moved self node to orphan state");
5647}
5648
5649/**
5650 * Handle timer event in the syncing state.
5651 */
5652static void
5653register_syncing_timer_event_handle()
5654{
5655 CLUSTERING_LOCK();
5656 cf_clock now = cf_getms();
5657 if (g_register.last_sync_check_time + register_sync_check_interval()
5658 > now) {
5659 // Give more time before checking for sync.
5660 goto Exit;
5661 }
5662
5663 if (register_is_sycn_pending()) {
5664 // Update pending nodes based on heartbeat status.
5665 int num_pending = cf_vector_size(&g_register.sync_pending);
5666 for (int i = 0; i < num_pending; i++) {
5667 cf_node pending;
5668 cf_vector_get(&g_register.sync_pending, i, &pending);
5669 if (clustering_node_is_sync(pending)) {
5670 cf_vector_delete(&g_register.sync_pending, i);
5671
5672 // Compensate the index for the delete.
5673 i--;
5674
5675 // Adjust vector size.
5676 num_pending--;
5677 }
5678 }
5679 }
5680
5681 register_check_and_switch_synced();
5682
5683Exit:
5684 CLUSTERING_UNLOCK();
5685}
5686
5687/**
5688 * Send cluster change applied message to all cluster members.
5689 */
5690static void
5691register_cluster_change_applied_msg_send()
5692{
5693 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_CLUSTER_CHANGE_APPLIED);
5694
5695 CLUSTERING_LOCK();
5696
5697 // Set the cluster key.
5698 msg_cluster_key_set(msg, g_register.cluster_key);
5699
5700 // Set the succession list.
5701 msg_succession_list_set(msg, &g_register.succession_list);
5702
5703 log_cf_node_vector("cluster change applied message sent to:",
5704 &g_register.succession_list, CF_DEBUG);
5705
5706 cf_vector* members = vector_stack_lockless_create(cf_node);
5707 vector_copy(members, &g_register.succession_list);
5708
5709 CLUSTERING_UNLOCK();
5710
5711 // Sent the message to the cluster members.
5712 msg_nodes_send(msg, members);
5713 cf_vector_destroy(members);
5714}
5715
5716/**
5717 * Validate cluster state. For now ensure the cluster size is greater than the
5718 * min cluster size.
5719 */
5720static void
5721register_validate_cluster()
5722{
5723 CLUSTERING_LOCK();
5724 int cluster_size = cf_vector_size(&g_register.succession_list);
5725 if (!clustering_is_orphan()
5726 && cluster_size < g_config.clustering_config.cluster_size_min) {
5727 WARNING(
5728 "cluster size %d less than required minimum size %d - switching to orphan state",
5729 cluster_size, g_config.clustering_config.cluster_size_min);
5730 register_become_orphan (AS_CLUSTERING_MEMBERSHIP_LOST);
5731 }
5732 CLUSTERING_UNLOCK();
5733}
5734
5735/**
5736 * Handle a timer event for the register.
5737 */
5738static void
5739register_timer_event_handle()
5740{
5741 CLUSTERING_LOCK();
5742 switch (g_register.state) {
5743 case AS_CLUSTERING_REGISTER_STATE_SYNCED:
5744 register_validate_cluster();
5745 break;
5746 case AS_CLUSTERING_REGISTER_STATE_SYNCING:
5747 register_syncing_timer_event_handle();
5748 break;
5749 }
5750 CLUSTERING_UNLOCK();
5751}
5752
5753/**
5754 * Handle paxos round succeeding.
5755 */
5756static void
5757register_paxos_acceptor_success_handle(
5758 as_clustering_internal_event* paxos_success_event)
5759{
5760 CLUSTERING_LOCK();
5761
5762 g_register.has_orphan_transitioned = false;
5763
5764 g_register.cluster_key = paxos_success_event->new_cluster_key;
5765 g_register.sequence_number = paxos_success_event->new_sequence_number;
5766
5767 vector_clear(&g_register.succession_list);
5768 vector_copy(&g_register.succession_list,
5769 paxos_success_event->new_succession_list);
5770
5771 // Update the timestamps as the register has changed its contents.
5772 g_register.cluster_modified_time = cf_getms();
5773 g_register.cluster_modified_hlc_ts = as_hlc_timestamp_now();
5774
5775 // Initialize pending list with all cluster members.
5776 g_register.state = AS_CLUSTERING_REGISTER_STATE_SYNCING;
5777 vector_clear(&g_register.sync_pending);
5778 vector_copy(&g_register.sync_pending, &g_register.succession_list);
5779 register_cluster_change_applied_msg_send();
5780
5781 if (g_register.cluster_key == g_register.ooo_cluster_key
5782 && vector_equals(&g_register.succession_list,
5783 &g_register.ooo_succession_list)) {
5784 // We have already received change applied message from these node
5785 // account for them.
5786 vector_subtract(&g_register.sync_pending,
5787 &g_register.ooo_change_applied_received);
5788 }
5789 vector_clear(&g_register.ooo_change_applied_received);
5790 vector_clear(&g_register.ooo_succession_list);
5791 g_register.ooo_cluster_key = 0;
5792 g_register.ooo_hlc_timestamp = 0;
5793
5794 INFO("applied new cluster key %"PRIx64,
5795 paxos_success_event->new_cluster_key);
5796 log_cf_node_vector("applied new succession list",
5797 &g_register.succession_list, CF_INFO);
5798 INFO("applied cluster size %d",
5799 cf_vector_size(&g_register.succession_list));
5800
5801 as_clustering_internal_event cluster_changed;
5802 memset(&cluster_changed, 0, sizeof(cluster_changed));
5803 cluster_changed.type =
5804 AS_CLUSTERING_INTERNAL_EVENT_REGISTER_CLUSTER_CHANGED;
5805 internal_event_dispatch(&cluster_changed);
5806
5807 // Send change appied message. Its alright even if they are out of order.
5808 register_cluster_change_applied_msg_send();
5809
5810 CLUSTERING_UNLOCK();
5811}
5812
5813/**
5814 * Handle incoming cluster change applied message.
5815 */
5816static void
5817register_cluster_change_applied_msg_handle(
5818 as_clustering_internal_event* msg_event)
5819{
5820 CLUSTERING_LOCK();
5821 as_cluster_key msg_cluster_key = 0;
5822 msg_cluster_key_get(msg_event->msg, &msg_cluster_key);
5823 cf_vector *msg_succession_list = vector_stack_lockless_create(cf_node);
5824 msg_succession_list_get(msg_event->msg, msg_succession_list);
5825 as_hlc_timestamp msg_hlc_timestamp = 0;
5826 msg_send_ts_get(msg_event->msg, &msg_hlc_timestamp);
5827
5828 DEBUG("received cluster change applied message from node %"PRIx64,
5829 msg_event->msg_src_nodeid);
5830 if (g_register.cluster_key == msg_cluster_key
5831 && vector_equals(&g_register.succession_list,
5832 msg_succession_list)) {
5833 // This is a matching change applied message.
5834 int found_at = 0;
5835 if ((found_at = vector_find(&g_register.sync_pending,
5836 &msg_event->msg_src_nodeid)) >= 0) {
5837 // Remove from the pending list.
5838 cf_vector_delete(&g_register.sync_pending, found_at);
5839 }
5840
5841 }
5842 else if (g_register.ooo_cluster_key == msg_cluster_key
5843 && vector_equals(&g_register.ooo_succession_list,
5844 msg_succession_list)) {
5845 DEBUG("received ooo cluster change applied message from node %"PRIx64" with cluster key %"PRIx64, msg_event->msg_src_nodeid, msg_cluster_key);
5846 cf_vector_append_unique(&g_register.ooo_change_applied_received,
5847 &msg_event->msg_src_nodeid);
5848
5849 }
5850 else if (g_register.ooo_hlc_timestamp < msg_hlc_timestamp) {
5851 // Prefer a later version of OOO message.
5852 g_register.ooo_cluster_key = msg_cluster_key;
5853 g_register.ooo_hlc_timestamp = msg_hlc_timestamp;
5854 vector_clear(&g_register.ooo_succession_list);
5855 vector_copy(&g_register.ooo_succession_list, msg_succession_list);
5856 vector_clear(&g_register.ooo_change_applied_received);
5857 cf_vector_append_unique(&g_register.ooo_change_applied_received,
5858 &msg_event->msg_src_nodeid);
5859 DEBUG("received ooo cluster change applied message from node %"PRIx64" with cluster key %"PRIx64, msg_event->msg_src_nodeid, msg_cluster_key);
5860 }
5861 else {
5862 INFO(
5863 "ignoring cluster mismatching change applied message from node %"PRIx64,
5864 msg_event->msg_src_nodeid);
5865 }
5866 cf_vector_destroy(msg_succession_list);
5867 register_check_and_switch_synced();
5868 CLUSTERING_UNLOCK();
5869}
5870
5871/**
5872 * Handle incoming message.
5873 */
5874static void
5875register_msg_event_handle(as_clustering_internal_event* msg_event)
5876{
5877 CLUSTERING_LOCK();
5878 as_clustering_msg_type type;
5879 msg_type_get(msg_event->msg, &type);
5880
5881 if (type == AS_CLUSTERING_MSG_TYPE_CLUSTER_CHANGE_APPLIED) {
5882 register_cluster_change_applied_msg_handle(msg_event);
5883 }
5884 CLUSTERING_UNLOCK();
5885}
5886
5887/**
5888 * Dispatch internal events to the register.
5889 */
5890static void
5891register_event_dispatch(as_clustering_internal_event* event)
5892{
5893 switch (event->type) {
5894 case AS_CLUSTERING_INTERNAL_EVENT_TIMER:
5895 register_timer_event_handle();
5896 break;
5897 case AS_CLUSTERING_INTERNAL_EVENT_PAXOS_ACCEPTOR_SUCCESS:
5898 register_paxos_acceptor_success_handle(event);
5899 break;
5900 case AS_CLUSTERING_INTERNAL_EVENT_MSG:
5901 register_msg_event_handle(event);
5902 break;
5903 default: // Not of interest for the register.
5904 break;
5905 }
5906}
5907
5908/*
5909 * ----------------------------------------------------------------------------
5910 * Clustering core (triggers cluster changes)
5911 * ----------------------------------------------------------------------------
5912 */
5913
5914/**
5915 * Send a join reject message to destination node.
5916 */
5917static void
5918clustering_join_reject_send(cf_node dest)
5919{
5920 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_JOIN_REJECT);
5921
5922 DETAIL("sent join reject to node %"PRIx64, dest);
5923
5924 // Sent the message to the acceptors.
5925 msg_node_send(msg, dest);
5926}
5927
5928/**
5929 * Send cluster join reject message to all nodes in the vector.
5930 */
5931static void
5932clustering_join_requests_reject(cf_vector* rejected_nodes)
5933{
5934 int rejected_node_count = cf_vector_size(rejected_nodes);
5935 for (int i = 0; i < rejected_node_count; i++) {
5936 // No null check required since we are iterating under a lock and within
5937 // vector bounds.
5938 cf_node requesting_nodeid = *((cf_node*)cf_vector_getp(rejected_nodes,
5939 i));
5940
5941 // Send the reject message.
5942 clustering_join_reject_send(requesting_nodeid);
5943 }
5944}
5945
5946/**
5947 * Send join reject message for all pending join requests.
5948 */
5949static void
5950clustering_join_requests_reject_all()
5951{
5952 CLUSTERING_LOCK();
5953
5954 cf_vector* rejected_nodes = vector_stack_lockless_create(cf_node);
5955 vector_copy_unique(rejected_nodes, &g_clustering.pending_join_requests);
5956
5957 vector_clear(&g_clustering.pending_join_requests);
5958
5959 CLUSTERING_UNLOCK();
5960
5961 clustering_join_requests_reject(rejected_nodes);
5962
5963 cf_vector_destroy(rejected_nodes);
5964}
5965
5966/**
5967 * Send a join request to a principal.
5968 * @param new_principal the destination principal node.
5969 * @return 0 on successful message queue, -1 on failure.
5970 */
5971static int
5972clustering_join_request_send(cf_node new_principal)
5973{
5974 int rv = -1;
5975 CLUSTERING_LOCK();
5976
5977 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_JOIN_REQUEST);
5978
5979 DETAIL("sending cluster join request to node %"PRIx64, new_principal);
5980
5981 if (msg_node_send(msg, new_principal) == 0) {
5982 cf_clock now = cf_getms();
5983 cf_shash_put(g_clustering.join_request_blackout, &new_principal, &now);
5984
5985 g_clustering.last_join_request_principal = new_principal;
5986 g_clustering.last_join_request_sent_time =
5987 g_clustering.last_join_request_retransmit_time = cf_getms();
5988
5989 INFO("sent cluster join request to %"PRIx64, new_principal);
5990 rv = 0;
5991 }
5992
5993 // Send early reject to all nodes that have send us a join request in the
5994 // orphan state, because self node is not going to become a principal node.
5995 // This allows the requesting nodes to send requests to other
5996 // (potential)principals.
5997 clustering_join_requests_reject_all();
5998
5999 CLUSTERING_UNLOCK();
6000 return rv;
6001}
6002
6003/**
6004 * Retransmit a join request to a previously attmepted principal.
6005 * @param last_join_request_principal the principal to retransmit to.
6006 */
6007static void
6008clustering_join_request_retransmit(cf_node last_join_request_principal)
6009{
6010 CLUSTERING_LOCK();
6011 cf_node new_principal = g_clustering.last_join_request_principal;
6012 g_clustering.last_join_request_retransmit_time = cf_getms();
6013 CLUSTERING_UNLOCK();
6014
6015 if (new_principal != last_join_request_principal) {
6016 // The last attempted principal has changed. Don't retransmit.
6017 return;
6018 }
6019
6020 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_JOIN_REQUEST);
6021 DETAIL("re-sending cluster join request to node %"PRIx64, new_principal);
6022 if (msg_node_send(msg, new_principal) == 0) {
6023 DEBUG("re-sent cluster join request to %"PRIx64, new_principal);
6024 }
6025}
6026
6027/**
6028 * Remove nodes for which join requests are blocked.
6029 *
6030 * @param requestees the nodes considered for join requests.
6031 * @param target the result with requestees that are not blocked.
6032 */
6033static void
6034clustering_join_request_filter_blocked(cf_vector* requestees, cf_vector* target)
6035{
6036 CLUSTERING_LOCK();
6037 cf_clock last_sent;
6038 int requestee_count = cf_vector_size(requestees);
6039 for (int i = 0; i < requestee_count; i++) {
6040 cf_node requestee;
6041 cf_vector_get(requestees, i, &requestee);
6042 if (cf_shash_get(g_clustering.join_request_blackout, &requestee,
6043 &last_sent) != CF_SHASH_OK) {
6044 // The requestee is not marked for blackout
6045 cf_vector_append(target, &requestee);
6046 }
6047 }
6048 CLUSTERING_UNLOCK();
6049}
6050
6051/**
6052 * Send a cluster join request to a neighboring principal. If
6053 * preferred_principal is set and it is an eligible neighboring principal, a
6054 * request is sent to that principal, else this function cycles among eligible
6055 * neighboring principals at each call.
6056 *
6057 * A request will not be sent if there is no neighboring principal.
6058 *
6059 * @param preferred_principal the preferred principal to join. User zero if
6060 * there is no preference.
6061 * @return 0 if the join request was send or there is one in progress. -1 if
6062 * there are no principals to try and send the join request.
6063 */
6064static as_clustering_join_request_result
6065clustering_principal_join_request_attempt(cf_node preferred_principal)
6066{
6067 CLUSTERING_LOCK();
6068
6069 as_clustering_join_request_result rv = AS_CLUSTERING_JOIN_REQUEST_SENT;
6070 cf_vector* neighboring_principals = vector_stack_lockless_create(cf_node);
6071 cf_vector* eligible_principals = vector_stack_lockless_create(cf_node);
6072
6073 // Get list of neighboring principals.
6074 clustering_neighboring_principals_get(neighboring_principals);
6075 if (cf_vector_size(neighboring_principals) == 0) {
6076 DEBUG("no neighboring principal found - not sending join request");
6077 rv = AS_CLUSTERING_JOIN_REQUEST_NO_PRINCIPALS;
6078 goto Exit;
6079 }
6080
6081 clustering_join_request_filter_blocked(neighboring_principals,
6082 eligible_principals);
6083
6084 if (cf_vector_size(eligible_principals) == 0) {
6085 DETAIL("no eligible principals found to make a join request");
6086 // This principal is still in the blackout list. Do not send a request.
6087 rv = AS_CLUSTERING_JOIN_REQUEST_PENDING;
6088 goto Exit;
6089 }
6090
6091 int next_join_request_principal_index = -1;
6092
6093 // We have some well-formed neighboring clusters, try and join them
6094 if (preferred_principal != 0) {
6095 int preferred_principal_index = vector_find(eligible_principals,
6096 &preferred_principal);
6097 if (preferred_principal_index >= 0) {
6098 DETAIL("sending join request to preferred principal %"PRIx64,
6099 preferred_principal);
6100
6101 // Update the index of the principal to try.
6102 next_join_request_principal_index = preferred_principal_index;
6103 }
6104 }
6105
6106 if (next_join_request_principal_index == -1) {
6107 // Choose the first entry, since we have no valid preferred principal.
6108 next_join_request_principal_index = 0;
6109 if (g_clustering.last_join_request_principal != 0) {
6110 // Choose the node after the current principal. If the current
6111 // principal is not found we start at index 0 else the next index.
6112 next_join_request_principal_index = vector_find(eligible_principals,
6113 &g_clustering.last_join_request_principal) + 1;
6114 }
6115 }
6116
6117 // Forget the fact that a join request is pending for a principal.
6118 g_clustering.last_join_request_principal = 0;
6119
6120 cf_node* principal_to_try = cf_vector_getp(eligible_principals,
6121 next_join_request_principal_index
6122 % cf_vector_size(eligible_principals));
6123
6124 if (principal_to_try) {
6125 rv = clustering_join_request_send(*principal_to_try) == 0 ?
6126 AS_CLUSTERING_JOIN_REQUEST_SENT :
6127 AS_CLUSTERING_JOIN_REQUEST_SEND_FAILED;
6128
6129 }
6130 else {
6131 DEBUG("no neighboring principal found - not sending join request");
6132 rv = AS_CLUSTERING_JOIN_REQUEST_NO_PRINCIPALS;
6133 }
6134
6135Exit:
6136 if (rv != AS_CLUSTERING_JOIN_REQUEST_SENT) {
6137 // Forget the last principal we sent the join request to.
6138 g_clustering.last_join_request_principal = 0;
6139 g_clustering.last_join_request_sent_time = 0;
6140 }
6141
6142 CLUSTERING_UNLOCK();
6143
6144 cf_vector_destroy(neighboring_principals);
6145 cf_vector_destroy(eligible_principals);
6146
6147 return rv;
6148}
6149
6150/**
6151 * Send a cluster join request to a neighboring orphan who this node thinks will
6152 * be best suited to form a new cluster.
6153 */
6154static as_clustering_join_request_result
6155clustering_orphan_join_request_attempt()
6156{
6157 CLUSTERING_LOCK();
6158
6159 // Get list of neighboring orphans.
6160 cf_vector* orphans = vector_stack_lockless_create(cf_node);
6161 clustering_neighboring_orphans_get(orphans);
6162
6163 // Get filtered list of orphans.
6164 cf_vector* new_succession_list = vector_stack_lockless_create(cf_node);
6165 clustering_join_request_filter_blocked(orphans, new_succession_list);
6166
6167 log_cf_node_vector("neighboring orphans for join request:",
6168 new_succession_list, CF_DEBUG);
6169
6170 // Add self node.
6171 cf_node self_nodeid = config_self_nodeid_get();
6172 cf_vector_append_unique(new_succession_list, &self_nodeid);
6173
6174 clustering_succession_list_clique_evict(new_succession_list,
6175 "clique based evicted nodes for potential cluster:");
6176
6177 // Sort the new succession list.
6178 vector_sort_unique(new_succession_list, cf_node_compare_desc);
6179
6180 as_clustering_join_request_result rv =
6181 AS_CLUSTERING_JOIN_REQUEST_NO_PRINCIPALS;
6182
6183 if (cf_vector_size(new_succession_list) > 0) {
6184 cf_node new_principal = *((cf_node*)cf_vector_getp(new_succession_list,
6185 0));
6186 if (new_principal == config_self_nodeid_get()) {
6187 // No need to send self a join request.
6188 goto Exit;
6189 }
6190 else {
6191 rv = clustering_join_request_send(new_principal) == 0 ?
6192 AS_CLUSTERING_JOIN_REQUEST_SENT :
6193 AS_CLUSTERING_JOIN_REQUEST_SEND_FAILED;
6194 }
6195 }
6196
6197Exit:
6198 cf_vector_destroy(new_succession_list);
6199 cf_vector_destroy(orphans);
6200
6201 CLUSTERING_UNLOCK();
6202 return rv;
6203}
6204
6205/**
6206 * Remove nodes from the blackout hash once they have been in the list for
6207 * greater than the blackout period.
6208 */
6209int
6210clustering_join_request_blackout_tend_reduce(const void* key, void* data,
6211 void* udata)
6212{
6213 cf_clock* join_request_send_time = (cf_clock*)data;
6214 if (*join_request_send_time + join_request_blackout_interval()
6215 < cf_getms()) {
6216 return CF_SHASH_REDUCE_DELETE;
6217 }
6218 return CF_SHASH_OK;
6219}
6220
6221/**
6222 * Tend the join request blackout data structure to remove blacked out
6223 * principals.
6224 */
6225static void
6226clustering_join_request_blackout_tend()
6227{
6228 CLUSTERING_LOCK();
6229 cf_shash_reduce(g_clustering.join_request_blackout,
6230 clustering_join_request_blackout_tend_reduce, NULL);
6231 CLUSTERING_UNLOCK();
6232}
6233
6234/**
6235 * Send a cluster join request to a neighboring principal if one exists, else if
6236 * there are no neighboring principals, send a join request to a neighboring
6237 * orphan node if this node thinks it will win paxos and become the new
6238 * principal.
6239 */
6240static as_clustering_join_request_result
6241clustering_join_request_attempt()
6242{
6243 clustering_join_request_blackout_tend();
6244
6245 CLUSTERING_LOCK();
6246 cf_node last_join_request_principal =
6247 g_clustering.last_join_request_principal;
6248 cf_clock last_join_request_sent_time =
6249 g_clustering.last_join_request_sent_time;
6250 cf_clock last_join_request_retransmit_time =
6251 g_clustering.last_join_request_retransmit_time;
6252 CLUSTERING_UNLOCK();
6253
6254 // Check if the outgoing join request has timed out.
6255 if (last_join_request_principal
6256 && as_hb_is_alive(last_join_request_principal)) {
6257 if (last_join_request_sent_time + join_request_timeout() > cf_getms()) {
6258 if (last_join_request_retransmit_time
6259 + join_request_retransmit_timeout() < cf_getms()) {
6260 // Re-transmit join request to the same principal, to cover the
6261 // case where the previous join request was lost.
6262 clustering_join_request_retransmit(last_join_request_principal);
6263 }
6264 // Wait for the principal to respond. do nothing
6265 DETAIL(
6266 "join request to principal %"PRIx64" pending - not attempting new join request",
6267 last_join_request_principal);
6268
6269 return AS_CLUSTERING_JOIN_REQUEST_PENDING;
6270 }
6271 // Timeout joining a principal. Choose a different principal.
6272 INFO("join request timed out for principal %"PRIx64,
6273 last_join_request_principal);
6274
6275 }
6276
6277 // Try sending a join request to a neighboring principal.
6278 as_clustering_join_request_result rv =
6279 clustering_principal_join_request_attempt(0);
6280
6281 if (rv != AS_CLUSTERING_JOIN_REQUEST_NO_PRINCIPALS) {
6282 // There are valid principals around. Don't send a request to
6283 // neighboring orphan nodes.
6284 return rv;
6285 }
6286
6287 // Send a join request to an orphan node, best suited to be the new
6288 // principal.
6289 return clustering_orphan_join_request_attempt();
6290}
6291
6292/**
6293 * Try to become a principal and start a new cluster.
6294 */
6295static void
6296clustering_cluster_form()
6297{
6298 ASSERT(clustering_is_orphan(),
6299 "should not attempt forming new cluster when not an orphan node");
6300
6301 CLUSTERING_LOCK();
6302 bool paxos_proposal_started = false;
6303 cf_vector* new_succession_list = vector_stack_lockless_create(cf_node);
6304 cf_vector* expected_succession_list = vector_stack_lockless_create(cf_node);
6305 cf_vector* orphans = vector_stack_lockless_create(cf_node);
6306
6307 clustering_neighboring_orphans_get(orphans);
6308 vector_copy(new_succession_list, orphans);
6309
6310 log_cf_node_vector("neighboring orphans for cluster formation:",
6311 new_succession_list,
6312 cf_vector_size(new_succession_list) > 0 ? CF_INFO : CF_DEBUG);
6313 log_cf_node_vector("pending join requests:",
6314 &g_clustering.pending_join_requests,
6315 cf_vector_size(&g_clustering.pending_join_requests) > 0 ?
6316 CF_INFO : CF_DEBUG);
6317
6318 // Add self node.
6319 cf_node self_nodeid = config_self_nodeid_get();
6320 cf_vector_append_unique(new_succession_list, &self_nodeid);
6321
6322 clustering_succession_list_clique_evict(new_succession_list,
6323 "clique based evicted nodes at cluster formation:");
6324
6325 // Sort the new succession list.
6326 vector_sort_unique(new_succession_list, cf_node_compare_desc);
6327
6328 cf_vector_append(expected_succession_list, &self_nodeid);
6329 vector_copy_unique(expected_succession_list,
6330 &g_clustering.pending_join_requests);
6331 // Sort the expected succession list.
6332 vector_sort_unique(expected_succession_list, cf_node_compare_desc);
6333 // The result should match the pending join requests exactly to consider the
6334 // new succession list.
6335 if (!vector_equals(expected_succession_list, new_succession_list)) {
6336 log_cf_node_vector(
6337 "skipping forming cluster - cannot form new cluster from pending join requests",
6338 &g_clustering.pending_join_requests, CF_INFO);
6339 goto Exit;
6340 }
6341
6342 if (cf_vector_size(orphans) > 0
6343 && cf_vector_size(new_succession_list) == 1) {
6344 log_cf_node_vector(
6345 "skipping forming cluster - there are neighboring orphans that cannot be clustered with",
6346 orphans, CF_INFO);
6347 goto Exit;
6348 }
6349
6350 if (cf_vector_size(new_succession_list) > 0) {
6351 cf_node new_principal = *((cf_node*)cf_vector_getp(new_succession_list,
6352 0));
6353 if (new_principal == config_self_nodeid_get()) {
6354 log_cf_node_vector(
6355 "principal node - forming new cluster with succession list:",
6356 new_succession_list, CF_INFO);
6357
6358 as_paxos_start_result result = paxos_proposer_proposal_start(
6359 new_succession_list, new_succession_list);
6360
6361 // Log paxos result.
6362 paxos_result_log(result, new_succession_list);
6363
6364 paxos_proposal_started = (result == AS_PAXOS_RESULT_STARTED);
6365 }
6366 else {
6367 INFO("skipping cluster formation - a new potential principal %"PRIx64" exists",
6368 new_principal);
6369 }
6370 }
6371
6372Exit:
6373 // Compute list of rejected nodes.
6374 if (paxos_proposal_started) {
6375 // Nodes in set (pending_join - new succession list) could not be
6376 // accomodated and should receive a join reject.
6377 vector_subtract(&g_clustering.pending_join_requests,
6378 new_succession_list);
6379 }
6380 else {
6381 // Reject all pending join requests. Will happen below.
6382 }
6383
6384 cf_vector* rejected_nodes = vector_stack_lockless_create(cf_node);
6385 vector_copy_unique(rejected_nodes, &g_clustering.pending_join_requests);
6386
6387 // Clear the pending join requests
6388 vector_clear(&g_clustering.pending_join_requests);
6389
6390 // Send reject messages to rejected nodes.
6391 clustering_join_requests_reject(rejected_nodes);
6392
6393 cf_vector_destroy(rejected_nodes);
6394
6395 cf_vector_destroy(orphans);
6396 cf_vector_destroy(expected_succession_list);
6397 cf_vector_destroy(new_succession_list);
6398
6399 CLUSTERING_UNLOCK();
6400}
6401
6402/**
6403 * Try to join a cluster if there is a neighboring one,
6404 * else try to form one.
6405 */
6406static void
6407clustering_join_or_form_cluster()
6408{
6409 ASSERT(clustering_is_orphan(),
6410 "should not attempt forming new cluster when not an orphan node");
6411
6412 if (paxos_proposer_proposal_is_active()) {
6413 // There is an active paxos round with this node as the proposed
6414 // principal.
6415 // Skip join cluster attempt and give current paxos round a chance to
6416 // form the cluster.
6417 return;
6418 }
6419
6420 CLUSTERING_LOCK();
6421
6422 // TODO (Discuss this): after some timeout and exhausting all neighboring
6423 // principals, become a single node cluster / try our own cluster. This
6424 // might not be required. Nonetheless discuss and figure this out. Current
6425 // behaviour is form new cluster after a timeout.
6426
6427 // A node is orphan for too long if it has attempted a join request which
6428 // timedout and its in orphan state for a while.
6429 bool orphan_for_too_long = (clustering_orphan_timeout()
6430 + g_clustering.orphan_state_start_time) < cf_getms()
6431 && g_clustering.last_join_request_principal
6432 && g_clustering.last_join_request_sent_time + join_request_timeout()
6433 < cf_getms();
6434
6435 if (orphan_for_too_long
6436 || clustering_join_request_attempt()
6437 == AS_CLUSTERING_JOIN_REQUEST_NO_PRINCIPALS) {
6438 // No neighboring principal found or we have been orphan for too long,
6439 // try and form a new cluster.
6440 clustering_cluster_form();
6441 }
6442 else {
6443 // A join request sent successfully or pending. Wait for the new
6444 // principal to respond.
6445
6446 // We are not going to be a principal node in this quantum, reject all
6447 // pending join requests.
6448 clustering_join_requests_reject_all();
6449 }
6450
6451 CLUSTERING_UNLOCK();
6452}
6453
6454/**
6455 * Get a list of nodes that need to be added to current succession list from
6456 * pending join requests. Bascially filters out node that are not orphans.
6457 */
6458static void
6459clustering_nodes_to_add_get(cf_vector* nodes_to_add)
6460{
6461 CLUSTERING_LOCK();
6462
6463 // Use a single iteration over the clustering data received via the
6464 // heartbeats instead of individual calls to get a consistent view and avoid
6465 // small lock and release.
6466 as_hb_plugin_data_iterate(&g_clustering.pending_join_requests,
6467 AS_HB_PLUGIN_CLUSTERING, clustering_orphan_nodes_find,
6468 nodes_to_add);
6469
6470 CLUSTERING_UNLOCK();
6471}
6472
6473/**
6474 * Handle quantum interval start in the orphan state. Try and join / form a
6475 * cluster.
6476 */
6477static void
6478clustering_orphan_quantum_interval_start_handle()
6479{
6480 if (!as_hb_self_is_duplicate()) {
6481 // Try to join a cluster or form a new one.
6482 clustering_join_or_form_cluster();
6483 }
6484}
6485
6486/**
6487 * Send a cluster move command to all nodes in the input list.
6488 *
6489 * @param candidate_principal the principal to which the other nodes should try
6490 * and join after receiving the move command.
6491 * @param cluster_key current cluster key for receiver validation.
6492 * @param nodeids the nodes to send move command to.
6493 */
6494static void
6495clustering_cluster_move_send(cf_node candidate_principal,
6496 as_cluster_key cluster_key, cf_vector* nodeids)
6497{
6498 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_MERGE_MOVE);
6499
6500 // Set the proposed principal.
6501 msg_proposed_principal_set(msg, candidate_principal);
6502
6503 // Set cluster key for message validation.
6504 msg_cluster_key_set(msg, cluster_key);
6505
6506 log_cf_node_vector("cluster merge move command sent to:", nodeids,
6507 CF_DEBUG);
6508
6509 // Sent the message to the acceptors.
6510 msg_nodes_send(msg, nodeids);
6511}
6512
6513/**
6514 * Update preferred principal votes using hb plugin data.
6515 */
6516static void
6517clustering_principal_preferred_principal_votes_count(cf_node nodeid,
6518 void* plugin_data, size_t plugin_data_size, cf_clock recv_monotonic_ts,
6519 as_hlc_msg_timestamp* msg_hlc_ts, void* udata)
6520{
6521 // A hash from each unique non null vinfo to a vector of partition ids
6522 // having the vinfo.
6523 cf_shash* preferred_principal_votes = (cf_shash*)udata;
6524
6525 CLUSTERING_LOCK();
6526 if (!clustering_hb_plugin_data_is_obsolete(
6527 g_register.cluster_modified_hlc_ts,
6528 g_register.cluster_modified_time, plugin_data, plugin_data_size,
6529 recv_monotonic_ts, msg_hlc_ts)) {
6530 cf_node* preferred_principal_p =
6531 clustering_hb_plugin_preferred_principal_get(plugin_data,
6532 plugin_data_size);
6533
6534 int current_votes = 0;
6535 if (cf_shash_get(preferred_principal_votes, preferred_principal_p,
6536 &current_votes) == CF_SHASH_OK) {
6537 current_votes++;
6538 }
6539 else {
6540 // We are seeing this preferred principal for the first time.
6541 current_votes = 0;
6542 }
6543
6544 cf_shash_put(preferred_principal_votes, preferred_principal_p,
6545 &current_votes);
6546 }
6547 else {
6548 DETAIL(
6549 "preferred principal voting skipped - found obsolete plugin data for node %"PRIx64,
6550 nodeid);
6551 }
6552 CLUSTERING_UNLOCK();
6553}
6554
6555/**
6556 * Get the preferred majority principal.
6557 */
6558static int
6559clustering_principal_preferred_principal_majority_find(const void* key,
6560 void* data, void* udata)
6561{
6562
6563 const cf_node* current_preferred_principal = (const cf_node*)key;
6564 int current_preferred_principal_votes = *(int*)data;
6565 cf_node* majority_preferred_principal = (cf_node*)udata;
6566
6567 CLUSTERING_LOCK();
6568 int preferred_principal_majority =
6569 (int)ceil(
6570 cf_vector_size(
6571 &g_register.succession_list) * AS_CLUSTERING_PREFERRRED_PRINCIPAL_MAJORITY);
6572 bool is_majority = current_preferred_principal_votes
6573 >= preferred_principal_majority;
6574 CLUSTERING_UNLOCK();
6575
6576 if (is_majority) {
6577 *majority_preferred_principal = *current_preferred_principal;
6578 // Majority found, halt reduce.
6579 return CF_SHASH_ERR_FOUND;
6580 }
6581
6582 return CF_SHASH_OK;
6583}
6584
6585/**
6586 * Get preferred principal based on a majority of non-principal's preferred
6587 * principals.
6588 * @return the preferred principal nodeid if there is a majority, else zero.
6589 */
6590static cf_node
6591clustering_principal_majority_preferred_principal_get()
6592{
6593 // A hash from each unique non null vinfo to a vector of partition ids
6594 // having the vinfo.
6595 cf_shash* preferred_principal_votes = cf_shash_create(cf_nodeid_shash_fn,
6596 sizeof(cf_node), sizeof(int), AS_CLUSTERING_CLUSTER_MAX_SIZE_SOFT,
6597 0);
6598
6599 CLUSTERING_LOCK();
6600
6601 // Use a single iteration over the clustering data received via the
6602 // heartbeats instead of individual calls to get a consistent view and avoid
6603 // small lock and release.
6604 as_hb_plugin_data_iterate(&g_register.succession_list,
6605 AS_HB_PLUGIN_CLUSTERING,
6606 clustering_principal_preferred_principal_votes_count,
6607 preferred_principal_votes);
6608
6609 // Find the majority preferred principal.
6610 cf_node preferred_principal = 0;
6611 cf_shash_reduce(preferred_principal_votes,
6612 clustering_principal_preferred_principal_majority_find,
6613 &preferred_principal);
6614
6615 CLUSTERING_UNLOCK();
6616
6617 cf_shash_destroy(preferred_principal_votes);
6618
6619 DETAIL("preferred principal is %"PRIx64, preferred_principal);
6620
6621 return preferred_principal;
6622}
6623
6624/**
6625 * Indicates if this node is a principal and its cluster can be merged with this
6626 * principal node's cluster.
6627 *
6628 * @param nodeid the candidate nodeid.
6629 * @param node_succession_list the candidate node's succession list.
6630 * @param node_succession_list_length the length of the node's succession list.
6631 * @return true if current node can be merged with this node's cluster.
6632 */
6633bool
6634clustering_is_merge_candidate(cf_node nodeid, cf_node* node_succession_list,
6635 int node_succession_list_length)
6636{
6637 if (node_succession_list_length <= 0 || node_succession_list[0] != nodeid) {
6638 // Not a principal node. Ignore.
6639 return false;
6640 }
6641
6642 if (nodeid < config_self_nodeid_get()) {
6643 // Has a smaller nodeid. Ignore. This node will merge with our cluster.
6644 return false;
6645 }
6646
6647 cf_vector* new_succession_list = vector_stack_lockless_create(cf_node);
6648
6649 CLUSTERING_LOCK();
6650 vector_copy_unique(new_succession_list, &g_register.succession_list);
6651 CLUSTERING_UNLOCK();
6652
6653 bool is_candidate = false;
6654
6655 // Node is the principal of its cluster. Create the new succession list.
6656 for (int i = 0; i < node_succession_list_length; i++) {
6657 cf_vector_append_unique(new_succession_list, &node_succession_list[i]);
6658 }
6659
6660 int expected_cluster_size = cf_vector_size(new_succession_list);
6661
6662 // Find and evict the nodes that are not well connected.
6663 clustering_succession_list_clique_evict(new_succession_list,
6664 "clique based evicted nodes at cluster merge:");
6665 int new_cluster_size = cf_vector_size(new_succession_list);
6666
6667 // If no nodes need to be evicted then the merge is fine.
6668 is_candidate = (expected_cluster_size == new_cluster_size);
6669
6670 // Exit:
6671 cf_vector_destroy(new_succession_list);
6672
6673 return is_candidate;
6674}
6675
6676/**
6677 * HB plugin iterate function to find principals that this node's cluster can be
6678 * merged with.
6679 */
6680static void
6681clustering_merge_candiate_find(cf_node nodeid, void* plugin_data,
6682 size_t plugin_data_size, cf_clock recv_monotonic_ts,
6683 as_hlc_msg_timestamp* msg_hlc_ts, void* udata)
6684{
6685 cf_node* candidate_principal = (cf_node*)udata;
6686
6687 CLUSTERING_LOCK();
6688
6689 if (!clustering_hb_plugin_data_is_obsolete(
6690 g_register.cluster_modified_hlc_ts,
6691 g_register.cluster_modified_time, plugin_data, plugin_data_size,
6692 recv_monotonic_ts, msg_hlc_ts)) {
6693 uint32_t* other_succession_list_length =
6694 clustering_hb_plugin_succession_length_get(plugin_data,
6695 plugin_data_size);
6696
6697 cf_node* other_succession_list = clustering_hb_plugin_succession_get(
6698 plugin_data, plugin_data_size);
6699
6700 if (other_succession_list != NULL
6701 && clustering_is_merge_candidate(nodeid, other_succession_list,
6702 *other_succession_list_length)
6703 && *candidate_principal < nodeid) {
6704 DETAIL("principal node %"PRIx64" potential candidate for cluster merge", nodeid);
6705 *candidate_principal = nodeid;
6706 }
6707
6708 }
6709 else {
6710 DETAIL(
6711 "merge check skipped - found obsolete plugin data for node %"PRIx64,
6712 nodeid);
6713 }
6714
6715 CLUSTERING_UNLOCK();
6716}
6717
6718/**
6719 * Attempt to move to the majority preferred principal.
6720 *
6721 * @return 0 if the move to preferred principal was attempted, -1 otherwise.
6722 */
6723static int
6724clustering_preferred_principal_move()
6725{
6726 cf_node preferred_principal =
6727 clustering_principal_majority_preferred_principal_get();
6728
6729 if (preferred_principal == 0
6730 || preferred_principal == config_self_nodeid_get()) {
6731 return -1;
6732 }
6733
6734 cf_vector* succession_list = vector_stack_lockless_create(cf_node);
6735 as_cluster_key cluster_key = 0;
6736 CLUSTERING_LOCK();
6737 vector_copy(succession_list, &g_register.succession_list);
6738 cluster_key = g_register.cluster_key;
6739 // Update the time move command was sent.
6740 g_clustering.move_cmd_issue_time = cf_getms();
6741 CLUSTERING_UNLOCK();
6742
6743 INFO("majority nodes find %"PRIx64" to be a better principal - sending move command to all cluster members",
6744 preferred_principal);
6745 clustering_cluster_move_send(preferred_principal, cluster_key,
6746 succession_list);
6747 cf_vector_destroy(succession_list);
6748
6749 return 0;
6750}
6751
6752/**
6753 * Attempt to merge with a larger adjacent cluster is the resulting cluster will
6754 * form a clique.
6755 *
6756 * @return 0 if a merge is attempted, -1 otherwise.
6757 */
6758static int
6759clustering_merge_attempt()
6760{
6761 int rv = -1;
6762 CLUSTERING_LOCK();
6763 cf_vector* succession_list = vector_stack_lockless_create(cf_node);
6764 vector_copy(succession_list, &g_register.succession_list);
6765 as_cluster_key cluster_key = g_register.cluster_key;
6766 cf_node candidate_principal = 0;
6767
6768 // Use a single iteration over the clustering data received via the
6769 // heartbeats instead of individual calls to get a consistent view and avoid
6770 // small lock and release.
6771 as_hb_plugin_data_iterate_all(AS_HB_PLUGIN_CLUSTERING,
6772 clustering_merge_candiate_find, &candidate_principal);
6773
6774 CLUSTERING_UNLOCK();
6775
6776 if (candidate_principal == 0) {
6777 DEBUG("no cluster merge candidates found");
6778 rv = -1;
6779 goto Exit;
6780 }
6781
6782 // Send a move command to all nodes in the succession list. Need not switch
6783 // to orphan state immediately, this node will receive the move command too
6784 // and will handle the move accordingly.
6785 INFO("this cluster can merge with cluster with principal %"PRIx64" - sending move command to all cluster members",
6786 candidate_principal);
6787 clustering_cluster_move_send(candidate_principal, cluster_key,
6788 succession_list);
6789 rv = 0;
6790Exit:
6791 cf_vector_destroy(succession_list);
6792 return rv;
6793}
6794
6795/**
6796 * Handle quantum interval start when self node is the principal of its cluster.
6797 */
6798static void
6799clustering_principal_quantum_interval_start_handle(
6800 as_clustering_internal_event* event)
6801{
6802 DETAIL("principal node quantum wakeup");
6803
6804 if (as_hb_self_is_duplicate()) {
6805 // Cluster is in a bad shape and self node has a duplicate node-id.
6806 register_become_orphan (AS_CLUSTERING_MEMBERSHIP_LOST);
6807 return;
6808 }
6809
6810 CLUSTERING_LOCK();
6811 bool paxos_proposal_started = false;
6812
6813 cf_vector* dead_nodes = vector_stack_lockless_create(cf_node);
6814 clustering_dead_nodes_find(dead_nodes);
6815
6816 log_cf_node_vector("dead nodes at quantum start:", dead_nodes,
6817 cf_vector_size(dead_nodes) > 0 ? CF_INFO : CF_DEBUG);
6818
6819 cf_vector* faulty_nodes = vector_stack_lockless_create(cf_node);
6820 clustering_faulty_nodes_find(faulty_nodes);
6821
6822 log_cf_node_vector("faulty nodes at quantum start:", faulty_nodes,
6823 cf_vector_size(faulty_nodes) > 0 ? CF_INFO : CF_DEBUG);
6824
6825 // Having dead node or faulty nodes is a sign of cluster integrity breach.
6826 // New nodes should not count as integrity breach.
6827 g_clustering.has_integrity = cf_vector_size(faulty_nodes) == 0
6828 && cf_vector_size(dead_nodes) == 0;
6829
6830 cf_vector* new_nodes = vector_stack_lockless_create(cf_node);
6831 clustering_nodes_to_add_get(new_nodes);
6832 log_cf_node_vector("join requests at quantum start:", new_nodes,
6833 cf_vector_size(new_nodes) > 0 ? CF_INFO : CF_DEBUG);
6834
6835 cf_vector* new_succession_list = vector_stack_lockless_create(cf_node);
6836 vector_copy_unique(new_succession_list, &g_register.succession_list);
6837 vector_subtract(new_succession_list, dead_nodes);
6838 vector_subtract(new_succession_list, faulty_nodes);
6839 vector_copy_unique(new_succession_list, new_nodes);
6840
6841 // Add self node. We should not miss self in the succession list, but be
6842 // doubly sure.
6843 cf_node self_nodeid = config_self_nodeid_get();
6844 cf_vector_append_unique(new_succession_list, &self_nodeid);
6845
6846 vector_sort_unique(new_succession_list, cf_node_compare_desc);
6847 uint32_t num_evicted = clustering_succession_list_clique_evict(
6848 new_succession_list,
6849 "clique based evicted nodes at quantum start:");
6850
6851 if (event->quantum_interval_is_skippable && cf_vector_size(dead_nodes) != 0
6852 && !quantum_interval_is_adjacency_fault_seen()) {
6853 // There is an imminent adjacency fault that has not been seen by the
6854 // quantum interval generator, lets not take any action.
6855 DEBUG("adjacency fault imminent - skipping quantum interval handling");
6856 quantum_interval_mark_postponed();
6857 goto Exit;
6858 }
6859
6860 if (event->quantum_interval_is_skippable && num_evicted != 0
6861 && !quantum_interval_is_peer_adjacency_fault_seen()) {
6862 // There is an imminent adjacency fault that has not been seen by the
6863 // quantum interval generator, lets not take any action.
6864 DEBUG(
6865 "peer adjacency fault imminent - skipping quantum interval handling");
6866 quantum_interval_mark_postponed();
6867 goto Exit;
6868 }
6869
6870 if (cf_vector_size(faulty_nodes) == 0 && cf_vector_size(dead_nodes) == 0) {
6871 // We might have only pending join requests. Attempt a move to a
6872 // preferred principal or a merge before trying to add new nodes.
6873 if (clustering_preferred_principal_move() == 0
6874 || clustering_merge_attempt() == 0) {
6875 goto Exit;
6876 }
6877 }
6878
6879 if (vector_equals(new_succession_list, &g_register.succession_list)
6880 && cf_vector_size(faulty_nodes) == 0) {
6881 // There is no change in the succession list and also there are no
6882 // faulty nodes. If there are faulty nodes they have probably restarted
6883 // quickly, in which case a new cluster transition with the same
6884 // succession list is required.
6885 goto Exit;
6886 }
6887
6888 if (cf_vector_size(faulty_nodes) != 0
6889 && cf_vector_size(new_succession_list) == 1) {
6890 // This node most likely lost time (slept/paused) and the rest of the
6891 // cluster reformed. Its best to go to the orphan state and start from
6892 // there instead of moving to a single node cluster and again eventually
6893 // forming a larger cluster.
6894 WARNING(
6895 "all cluster members are part of different cluster - changing state to orphan");
6896 register_become_orphan (AS_CLUSTERING_MEMBERSHIP_LOST);
6897 goto Exit;
6898 }
6899
6900 // Start a new paxos round.
6901 log_cf_node_vector("current succession list", &g_register.succession_list,
6902 CF_DEBUG);
6903
6904 log_cf_node_vector("proposed succession list", new_succession_list,
6905 CF_DEBUG);
6906 DEBUG("proposed cluster size %d", cf_vector_size(new_succession_list));
6907
6908 as_paxos_start_result result = paxos_proposer_proposal_start(
6909 new_succession_list, new_succession_list);
6910
6911 // Log paxos result.
6912 paxos_result_log(result, new_succession_list);
6913
6914 // TODO: Should we move to orphan state if there are not enough nodes in the
6915 // cluster.
6916 // Tentatively yes....
6917 if (result == AS_PAXOS_RESULT_CLUSTER_TOO_SMALL) {
6918 register_become_orphan (AS_CLUSTERING_MEMBERSHIP_LOST);
6919 }
6920
6921 paxos_proposal_started = (result == AS_PAXOS_RESULT_STARTED);
6922Exit:
6923 // Although these are stack vectors the contents can be heap allocated on
6924 // resize. Destroy call is prudent.
6925 cf_vector_destroy(dead_nodes);
6926 cf_vector_destroy(faulty_nodes);
6927 cf_vector_destroy(new_nodes);
6928 cf_vector_destroy(new_succession_list);
6929
6930 // Compute list of rejected nodes.
6931 if (paxos_proposal_started) {
6932 // Nodes in set (pending_join - new succession list) could not be
6933 // accomodated and should receive a join reject.
6934 vector_subtract(&g_clustering.pending_join_requests,
6935 new_succession_list);
6936 }
6937 else {
6938 // Nodes in set (pending_join - current succession list) could not be
6939 // accomodated and should receive a join reject.
6940 vector_subtract(&g_clustering.pending_join_requests,
6941 &g_register.succession_list);
6942
6943 }
6944
6945 cf_vector* rejected_nodes = vector_stack_lockless_create(cf_node);
6946 vector_copy_unique(rejected_nodes, &g_clustering.pending_join_requests);
6947
6948 // Clear the pending join requests
6949 vector_clear(&g_clustering.pending_join_requests);
6950
6951 // Send reject messages to rejected nodes.
6952 clustering_join_requests_reject(rejected_nodes);
6953
6954 cf_vector_destroy(rejected_nodes);
6955
6956 CLUSTERING_UNLOCK();
6957}
6958
6959/**
6960 * Check for and handle eviction by self node's principal.
6961 *
6962 * @param principal_plugin_data the pluging data for the principal.
6963 * @param plugin_data_hlc_ts the hlc timestamp when the plugin data was
6964 * received.
6965 * @param plugin_data_ts the monotonic clock timestamp when the plugin data was
6966 * recvied.
6967 */
6968static void
6969clustering_non_principal_evicted_check(cf_node principal_nodeid,
6970 as_hb_plugin_node_data* principal_plugin_data,
6971 as_hlc_msg_timestamp* plugin_data_hlc_ts, cf_clock plugin_data_ts)
6972{
6973 CLUSTERING_LOCK();
6974 bool is_evicted = false;
6975
6976 if (!as_hb_is_alive(principal_nodeid)) {
6977 is_evicted = true;
6978 goto Exit;
6979 }
6980
6981 if (!clustering_is_our_principal(principal_nodeid)
6982 || clustering_hb_plugin_data_is_obsolete(
6983 g_register.cluster_modified_hlc_ts,
6984 g_register.cluster_modified_time,
6985 principal_plugin_data->data,
6986 principal_plugin_data->data_size, plugin_data_ts,
6987 plugin_data_hlc_ts)) {
6988 // The plugin data is obsolete. Can't take decisions based on it.
6989 goto Exit;
6990 }
6991
6992 // Get the changed node's succession list, cluster key. All the fields
6993 // should be present since the obsolete check also checked for fields being
6994 // valid.
6995 cf_node* succession_list_p = clustering_hb_plugin_succession_get(
6996 principal_plugin_data->data, principal_plugin_data->data_size);
6997 uint32_t* succession_list_length_p =
6998 clustering_hb_plugin_succession_length_get(
6999 principal_plugin_data->data,
7000 principal_plugin_data->data_size);
7001
7002 // Check if we have been evicted.
7003 if (!clustering_is_node_in_succession(config_self_nodeid_get(),
7004 succession_list_p, *succession_list_length_p)) {
7005 is_evicted = true;
7006 }
7007
7008Exit:
7009 if (is_evicted) {
7010 // This node has been evicted from the cluster.
7011 WARNING("evicted from cluster by principal node %"PRIx64"- changing state to orphan",
7012 principal_nodeid);
7013 register_become_orphan (AS_CLUSTERING_MEMBERSHIP_LOST);
7014 }
7015
7016 CLUSTERING_UNLOCK();
7017}
7018
7019/**
7020 * Monitor plugin data change events for evictions.
7021 */
7022static void
7023clustering_non_principal_hb_plugin_data_changed_handle(
7024 as_clustering_internal_event* change_event)
7025{
7026 clustering_non_principal_evicted_check(
7027 change_event->plugin_data_changed_nodeid, change_event->plugin_data,
7028 &change_event->plugin_data_changed_hlc_ts,
7029 change_event->plugin_data_changed_ts);
7030}
7031
7032/**
7033 * Update the preferred principal in the non-principal mode.
7034 */
7035static void
7036clustering_non_principal_preferred_principal_update()
7037{
7038 cf_node current_principal = 0;
7039 if (clustering_principal_get(&current_principal) != 0
7040 || current_principal == 0) {
7041 // We are an orphan.
7042 return;
7043 }
7044
7045 cf_vector* new_succession_list = vector_stack_lockless_create(cf_node);
7046
7047 clustering_neighboring_nodes_get(new_succession_list);
7048 cf_node self_nodeid = config_self_nodeid_get();
7049 cf_vector_append(new_succession_list, &self_nodeid);
7050
7051 clustering_succession_list_clique_evict(new_succession_list,
7052 "clique based evicted nodes while updating preferred principal:");
7053
7054 // Sort the new succession list.
7055 vector_sort_unique(new_succession_list, cf_node_compare_desc);
7056
7057 cf_node preferred_principal = 0;
7058 int new_cluster_size = cf_vector_size(new_succession_list);
7059 if (new_cluster_size > 0) {
7060 if (vector_find(new_succession_list, &current_principal) < 0) {
7061 cf_vector_get(new_succession_list, 0, &preferred_principal);
7062 }
7063 }
7064
7065 CLUSTERING_LOCK();
7066 if (preferred_principal != 0
7067 && g_clustering.preferred_principal != preferred_principal) {
7068 INFO("preferred principal updated to %"PRIx64,
7069 g_clustering.preferred_principal);
7070 }
7071 g_clustering.preferred_principal = preferred_principal;
7072
7073 cf_vector_destroy(new_succession_list);
7074 CLUSTERING_UNLOCK();
7075}
7076
7077/**
7078 * Handle quantum interval start in the non principal state.
7079 */
7080static void
7081clustering_non_principal_quantum_interval_start_handle()
7082{
7083 // Reject all accumulated join requests since we are no longer a principal.
7084 clustering_join_requests_reject_all();
7085
7086 if (as_hb_self_is_duplicate()) {
7087 // Cluster is in a bad shape and self node has a duplicate node-id.
7088 register_become_orphan (AS_CLUSTERING_MEMBERSHIP_LOST);
7089 return;
7090 }
7091
7092 // Update the preferred principal.
7093 clustering_non_principal_preferred_principal_update();
7094
7095 // Check if we have been evicted.
7096 cf_node principal = 0;
7097
7098 if (clustering_principal_get(&principal) != 0) {
7099 WARNING("could not get principal for self node");
7100 return;
7101 }
7102
7103 as_hlc_msg_timestamp plugin_data_hlc_ts;
7104 cf_clock plugin_data_ts = 0;
7105 as_hb_plugin_node_data plugin_data = { 0 };
7106
7107 if (clustering_hb_plugin_data_get(principal, &plugin_data,
7108 &plugin_data_hlc_ts, &plugin_data_ts) != 0) {
7109 plugin_data_ts = 0;
7110 memset(&plugin_data, 0, sizeof(plugin_data));
7111 }
7112
7113 clustering_non_principal_evicted_check(principal, &plugin_data,
7114 &plugin_data_hlc_ts, plugin_data_ts);
7115}
7116
7117/**
7118 * Handle quantum interval start.
7119 */
7120static void
7121clustering_quantum_interval_start_handle(as_clustering_internal_event* event)
7122{
7123 CLUSTERING_LOCK();
7124
7125 // Dispatch based on state.
7126 switch (g_clustering.state) {
7127 case AS_CLUSTERING_STATE_ORPHAN:
7128 clustering_orphan_quantum_interval_start_handle();
7129 break;
7130 case AS_CLUSTERING_STATE_PRINCIPAL:
7131 clustering_principal_quantum_interval_start_handle(event);
7132 break;
7133 case AS_CLUSTERING_STATE_NON_PRINCIPAL:
7134 clustering_non_principal_quantum_interval_start_handle();
7135 default:
7136 break;
7137 }
7138
7139 CLUSTERING_UNLOCK();
7140}
7141
7142/**
7143 * Handle a timer event in the orphan state.
7144 */
7145static void
7146clustering_orphan_timer_event_handle()
7147{
7148 // Attempt a join request.
7149 DETAIL("attempting join request from orphan state");
7150 clustering_join_request_attempt();
7151}
7152
7153/**
7154 * Handle a timer event for the clustering module.
7155 */
7156static void
7157clustering_timer_event_handle()
7158{
7159 CLUSTERING_LOCK();
7160
7161 // Dispatch based on state.
7162 switch (g_clustering.state) {
7163 case AS_CLUSTERING_STATE_ORPHAN:
7164 clustering_orphan_timer_event_handle();
7165 break;
7166 default:
7167 break;
7168 }
7169
7170 CLUSTERING_UNLOCK();
7171}
7172
7173/**
7174 * Check if the incoming message is sane to be proccessed further.
7175 */
7176static bool
7177clustering_message_sanity_check(cf_node src_nodeid, msg* msg)
7178{
7179 as_cluster_proto_identifier proto;
7180 if (msg_proto_id_get(msg, &proto) != 0) {
7181 WARNING(
7182 "received message with no clustering protocol identifier from node %"PRIx64,
7183 src_nodeid);
7184 return false;
7185 }
7186
7187 return clustering_versions_are_compatible(proto,
7188 clustering_protocol_identifier_get());
7189}
7190
7191/**
7192 * Handle an incoming join request. We do not bother with older replay's for
7193 * join requests because the pending request are cleanup during new cluster
7194 * formation.
7195 */
7196static void
7197clustering_join_request_handle(as_clustering_internal_event* msg_event)
7198{
7199 cf_node src_nodeid = msg_event->msg_src_nodeid;
7200 DEBUG("received cluster join request from node %"PRIx64, src_nodeid);
7201 bool fire_quantum_event = false;
7202
7203 CLUSTERING_LOCK();
7204
7205 cf_clock now = cf_getms();
7206
7207 if (g_clustering.move_cmd_issue_time + join_request_move_reject_interval()
7208 > now) {
7209 // We have just send out a move request. Reject this join request.
7210 INFO("ignoring join request from node %"PRIx64" since we have just issued a move command",
7211 src_nodeid);
7212 clustering_join_reject_send(src_nodeid);
7213 goto Exit;
7214 }
7215
7216 if ((!clustering_is_principal() && !clustering_is_orphan())
7217 || g_clustering.last_join_request_sent_time + join_request_timeout()
7218 >= cf_getms()) {
7219 // Can't handle a join request this node is not the principal right now
7220 // or this node is trying to join another cluster.
7221 msg* msg = msg_pool_get(AS_CLUSTERING_MSG_TYPE_JOIN_REJECT);
7222
7223 DETAIL("sent join reject to node %"PRIx64, msg_event->msg_src_nodeid);
7224
7225 // Sent the message to the acceptors.
7226 msg_node_send(msg, msg_event->msg_src_nodeid);
7227
7228 goto Exit;
7229 }
7230
7231 if (vector_find(&g_clustering.pending_join_requests, &src_nodeid) >= 0) {
7232 DEBUG("ignoring join request from node %"PRIx64" since a request is already pending",
7233 src_nodeid);
7234 goto Exit;
7235 }
7236
7237 // Check if we are receiving a stale or very delayed join request.
7238 int64_t message_delay_estimate = as_hlc_timestamp_diff_ms(
7239 as_hlc_timestamp_now(), msg_event->msg_hlc_ts.send_ts);
7240 if (message_delay_estimate < 0
7241 || message_delay_estimate > join_request_accept_delay_max()) {
7242 INFO("ignoring stale join request from node %"PRIx64" - delay estimate %lu(ms) ",
7243 src_nodeid, message_delay_estimate);
7244 goto Exit;
7245 }
7246
7247 // Add this request to the pending queue.
7248 cf_vector_append_unique(&g_clustering.pending_join_requests, &src_nodeid);
7249
7250 // Generate a join request accepted event for the quantum interval
7251 // generator.
7252 as_clustering_internal_event join_request_event;
7253 memset(&join_request_event, 0, sizeof(join_request_event));
7254 join_request_event.type =
7255 AS_CLUSTERING_INTERNAL_EVENT_JOIN_REQUEST_ACCEPTED;
7256 join_request_event.join_request_source_nodeid = src_nodeid;
7257 internal_event_dispatch(&join_request_event);
7258 fire_quantum_event = true;
7259
7260 INFO("accepted join request from node %"PRIx64, src_nodeid);
7261
7262Exit:
7263 CLUSTERING_UNLOCK();
7264
7265 if (fire_quantum_event) {
7266 internal_event_dispatch(&join_request_event);
7267 }
7268}
7269
7270/**
7271 * Handle an incoming join reject.
7272 */
7273static void
7274clustering_join_reject_handle(as_clustering_internal_event* event)
7275{
7276 cf_node src_nodeid = event->msg_src_nodeid;
7277
7278 DEBUG("received cluster join reject from node %"PRIx64, src_nodeid);
7279
7280 CLUSTERING_LOCK();
7281
7282 if (!clustering_is_orphan()) {
7283 // Already part of a cluster. Ignore the reject.
7284 INFO(
7285 "already part of a cluster - ignoring join reject from node %"PRIx64,
7286 src_nodeid);
7287 goto Exit;
7288 }
7289
7290 if (paxos_proposer_proposal_is_active()) {
7291 // This node is attempting to form a new cluster.
7292 INFO(
7293 "already trying to form a cluster - ignoring join reject from node %"PRIx64,
7294 src_nodeid);
7295 goto Exit;
7296 }
7297
7298 if (g_clustering.last_join_request_principal == src_nodeid) {
7299 // This node had requested the source principal for cluster membership
7300 // which was rejected. Try and join a different cluster.
7301
7302 // This join request should not be considered as pending, so reset the
7303 // join request sent time.
7304 g_clustering.last_join_request_sent_time = 0;
7305 g_clustering.last_join_request_principal = 0;
7306 clustering_join_request_attempt();
7307 }
7308
7309Exit:
7310 CLUSTERING_UNLOCK();
7311}
7312
7313/**
7314 * Handle an incoming merge move command. Basically this node switched to orphan
7315 * state and sends a join request to the principal listed in the merge move.
7316 */
7317static void
7318clustering_merge_move_handle(as_clustering_internal_event* event)
7319{
7320 cf_node src_nodeid = event->msg_src_nodeid;
7321
7322 DEBUG("received cluster merge move from node %"PRIx64, src_nodeid);
7323
7324 CLUSTERING_LOCK();
7325
7326 as_cluster_key msg_cluster_key = 0;
7327 msg_cluster_key_get(event->msg, &msg_cluster_key);
7328
7329 if (clustering_is_orphan()) {
7330 // Already part of a cluster. Ignore the reject.
7331 INFO(
7332 "already orphan node - ignoring merge move command from node %"PRIx64,
7333 src_nodeid);
7334 goto Exit;
7335 }
7336
7337 if (msg_is_obsolete(g_register.cluster_modified_hlc_ts,
7338 g_register.cluster_modified_time, event->msg_recvd_ts,
7339 &event->msg_hlc_ts) || !clustering_is_our_principal(src_nodeid)
7340 || paxos_proposer_proposal_is_active()
7341 || msg_cluster_key != g_register.cluster_key) {
7342 INFO("ignoring cluster merge move from node %"PRIx64, src_nodeid);
7343 goto Exit;
7344 }
7345
7346 // Madril simulation black lists current principal so that we do not end up
7347 // joining him again immediately. However the check for obsolete data should
7348 // make that check from madril redundant.
7349 cf_node new_principal = 0;
7350
7351 if (msg_proposed_principal_get(event->msg, &new_principal) != 0) {
7352 // Move command does not have the proposed principal
7353 WARNING(
7354 "received merge move command without a proposed principal. Will join the first available principal");
7355 new_principal = 0;
7356 }
7357
7358 // Switch to orphan cluster state so that we move to the new principal.
7359 register_become_orphan (AS_CLUSTERING_ATTEMPTING_MERGE);
7360
7361 // Send a join request to a the new principal
7362 clustering_principal_join_request_attempt(new_principal);
7363Exit:
7364 CLUSTERING_UNLOCK();
7365}
7366
7367/**
7368 * Handle an incoming message.
7369 */
7370static void
7371clustering_msg_event_handle(as_clustering_internal_event* msg_event)
7372{
7373 // Delegate handling based on message type.
7374 switch (msg_event->msg_type) {
7375 case AS_CLUSTERING_MSG_TYPE_JOIN_REQUEST:
7376 clustering_join_request_handle(msg_event);
7377 break;
7378 case AS_CLUSTERING_MSG_TYPE_JOIN_REJECT:
7379 clustering_join_reject_handle(msg_event);
7380 break;
7381 case AS_CLUSTERING_MSG_TYPE_MERGE_MOVE:
7382 clustering_merge_move_handle(msg_event);
7383 break;
7384 default: // Non cluster management messages.
7385 break;
7386 }
7387}
7388
7389/**
7390 * Fabric msg listener that generates an internal message event and dispatches
7391 * it to the sub system.
7392 */
7393static int
7394clustering_fabric_msg_listener(cf_node msg_src_nodeid, msg* msg, void* udata)
7395{
7396 if (!clustering_is_running()) {
7397 // Ignore fabric messages when clustering is not running.
7398 WARNING("clustering stopped - ignoring message from node %"PRIx64,
7399 msg_src_nodeid);
7400 goto Exit;
7401 }
7402
7403 // Sanity check.
7404 if (!clustering_message_sanity_check(msg_src_nodeid, msg)) {
7405 WARNING("invalid mesage received from node %"PRIx64, msg_src_nodeid);
7406 goto Exit;
7407 }
7408
7409 as_clustering_internal_event msg_event;
7410 memset(&msg_event, 0, sizeof(msg_event));
7411 msg_event.type = AS_CLUSTERING_INTERNAL_EVENT_MSG;
7412
7413 msg_event.msg_src_nodeid = msg_src_nodeid;
7414
7415 // Update hlc and store update message timestamp for the event.
7416 as_hlc_timestamp send_ts = 0;
7417 msg_send_ts_get(msg, &send_ts);
7418 as_hlc_timestamp_update(msg_event.msg_src_nodeid, send_ts,
7419 &msg_event.msg_hlc_ts);
7420
7421 msg_event.msg = msg;
7422 msg_event.msg_recvd_ts = cf_getms();
7423 msg_type_get(msg, &msg_event.msg_type);
7424
7425 internal_event_dispatch(&msg_event);
7426
7427Exit:
7428 as_fabric_msg_put(msg);
7429 return 0;
7430}
7431
7432/**
7433 * Handle register cluster changed.
7434 */
7435static void
7436clustering_register_cluster_changed_handle()
7437{
7438 CLUSTERING_LOCK();
7439
7440 if (paxos_proposer_proposal_is_active()) {
7441 paxos_proposer_fail();
7442 }
7443
7444 if (clustering_is_principal()) {
7445 g_clustering.state = AS_CLUSTERING_STATE_PRINCIPAL;
7446 }
7447 else {
7448 g_clustering.state = AS_CLUSTERING_STATE_NON_PRINCIPAL;
7449 // We are a non-principal. Reject all pending join requests.
7450 clustering_join_requests_reject_all();
7451 }
7452
7453 g_clustering.preferred_principal = 0;
7454 g_clustering.last_join_request_principal = 0;
7455 g_clustering.move_cmd_issue_time = 0;
7456
7457 CLUSTERING_UNLOCK();
7458}
7459
7460/**
7461 * Handle register synced events. Basically this means it is safe to publish the
7462 * cluster changed event to external sub systems.
7463 */
7464static void
7465clustering_register_cluster_synced_handle(as_clustering_internal_event* event)
7466{
7467 CLUSTERING_LOCK();
7468
7469 // Queue the cluster change event for publishing.
7470 as_clustering_event cluster_change_event;
7471 cluster_change_event.type = AS_CLUSTERING_CLUSTER_CHANGED;
7472 cluster_change_event.qualifier = event->qualifier;
7473 cluster_change_event.cluster_key = g_register.cluster_key;
7474 cluster_change_event.succession_list = &g_register.succession_list;
7475 external_event_queue(&cluster_change_event);
7476
7477 g_clustering.has_integrity = true;
7478
7479 CLUSTERING_UNLOCK();
7480}
7481
7482/**
7483 * Handle the register going to orphaned state.
7484 */
7485static void
7486clustering_register_orphaned_handle(as_clustering_internal_event* event)
7487{
7488 CLUSTERING_LOCK();
7489 g_clustering.state = AS_CLUSTERING_STATE_ORPHAN;
7490 g_clustering.orphan_state_start_time = cf_getms();
7491 g_clustering.preferred_principal = 0;
7492
7493 // Queue the cluster change event for publishing.
7494 as_clustering_event orphaned_event;
7495 orphaned_event.type = AS_CLUSTERING_ORPHANED;
7496 orphaned_event.qualifier = event->qualifier;
7497 orphaned_event.cluster_key = 0;
7498 orphaned_event.succession_list = NULL;
7499 external_event_queue(&orphaned_event);
7500 CLUSTERING_UNLOCK();
7501}
7502
7503/**
7504 * Handle hb plugin data change by dispatching it based on clustering change.
7505 */
7506static void
7507clustering_hb_plugin_data_changed_event_handle(
7508 as_clustering_internal_event* change_event)
7509{
7510 CLUSTERING_LOCK();
7511 switch (g_clustering.state) {
7512 case AS_CLUSTERING_STATE_NON_PRINCIPAL:
7513 clustering_non_principal_hb_plugin_data_changed_handle(change_event);
7514 break;
7515 default:
7516 break;
7517 }
7518 CLUSTERING_UNLOCK();
7519}
7520
7521/**
7522 * Handle heartbeat event.
7523 */
7524static void
7525clustering_hb_event_handle(as_clustering_internal_event* hb_event)
7526{
7527 for (int i = 0; i < hb_event->hb_n_events; i++) {
7528 if (hb_event->hb_events[i].evt == AS_HB_NODE_DEPART
7529 && clustering_is_our_principal(hb_event->hb_events[i].nodeid)) {
7530 // Our principal is no longer visible.
7531 INFO("principal node %"PRIx64" departed - switching to orphan state",
7532 hb_event->hb_events[i].nodeid);
7533 register_become_orphan (AS_CLUSTERING_MEMBERSHIP_LOST);
7534 }
7535 }
7536}
7537
7538/**
7539 * Handle the fail of a paxos proposal started by the self node.
7540 */
7541static void
7542clustering_paxos_proposer_fail_handle()
7543{
7544 // Send reject to all pending join requesters.
7545 clustering_join_requests_reject_all();
7546}
7547
7548/**
7549 * Clustering module event handler.
7550 */
7551static void
7552clustering_event_handle(as_clustering_internal_event* event)
7553{
7554 // Lock to enusure the entire event handling is atomic and parallel events
7555 // events (hb/fabric) do not interfere.
7556 CLUSTERING_LOCK();
7557
7558 switch (event->type) {
7559 case AS_CLUSTERING_INTERNAL_EVENT_TIMER:
7560 clustering_timer_event_handle();
7561 break;
7562 case AS_CLUSTERING_INTERNAL_EVENT_QUANTUM_INTERVAL_START:
7563 clustering_quantum_interval_start_handle(event);
7564 break;
7565 case AS_CLUSTERING_INTERNAL_EVENT_HB:
7566 clustering_hb_event_handle(event);
7567 break;
7568 case AS_CLUSTERING_INTERNAL_EVENT_HB_PLUGIN_DATA_CHANGED:
7569 clustering_hb_plugin_data_changed_event_handle(event);
7570 break;
7571 case AS_CLUSTERING_INTERNAL_EVENT_MSG:
7572 clustering_msg_event_handle(event);
7573 break;
7574 case AS_CLUSTERING_INTERNAL_EVENT_REGISTER_ORPHANED:
7575 clustering_register_orphaned_handle(event);
7576 break;
7577 case AS_CLUSTERING_INTERNAL_EVENT_REGISTER_CLUSTER_CHANGED:
7578 clustering_register_cluster_changed_handle();
7579 break;
7580 case AS_CLUSTERING_INTERNAL_EVENT_REGISTER_CLUSTER_SYNCED:
7581 clustering_register_cluster_synced_handle(event);
7582 break;
7583 case AS_CLUSTERING_INTERNAL_EVENT_PAXOS_PROPOSER_FAIL: // Send reject message to all
7584 clustering_paxos_proposer_fail_handle();
7585 break;
7586 default: // Not of interest for main clustering module.
7587 break;
7588 }
7589
7590 CLUSTERING_UNLOCK();
7591}
7592
7593/**
7594 * Initialize the template to be used for clustering messages.
7595 */
7596static void
7597clustering_msg_init()
7598{
7599 // Register fabric clustering msg type with no processing function:
7600 // This permits getting / putting clustering msgs to be moderated via an
7601 // idle msg queue.
7602 as_fabric_register_msg_fn(M_TYPE_CLUSTERING, g_clustering_msg_template,
7603 sizeof(g_clustering_msg_template), AS_CLUSTERING_MSG_SCRATCH_SIZE,
7604 clustering_fabric_msg_listener, NULL);
7605}
7606
7607/**
7608 * Change listener that updates the first time in current quantum.
7609 */
7610static void
7611clustering_hb_plugin_data_change_listener(cf_node changed_node_id)
7612{
7613 if (!clustering_is_running()) {
7614 return;
7615 }
7616
7617 DETAIL("cluster information change detected for node %"PRIx64,
7618 changed_node_id);
7619
7620 as_hb_plugin_node_data plugin_data;
7621 as_clustering_internal_event change_event;
7622 memset(&change_event, 0, sizeof(change_event));
7623 change_event.type = AS_CLUSTERING_INTERNAL_EVENT_HB_PLUGIN_DATA_CHANGED;
7624 change_event.plugin_data_changed_nodeid = changed_node_id;
7625 change_event.plugin_data = &plugin_data;
7626
7627 if (clustering_hb_plugin_data_get(changed_node_id, &plugin_data,
7628 &change_event.plugin_data_changed_hlc_ts,
7629 &change_event.plugin_data_changed_ts) != 0) {
7630 // Not possible. We should be able to read the plugin data that changed.
7631 return;
7632 }
7633 internal_event_dispatch(&change_event);
7634}
7635
7636/**
7637 * Listen to external heartbeat event and dispatch an internal heartbeat event.
7638 */
7639static void
7640clustering_hb_event_listener(int n_events, as_hb_event_node* hb_node_events,
7641 void* udata)
7642{
7643 if (!clustering_is_running()) {
7644 return;
7645 }
7646
7647 // Wrap the events in an internal event and dispatch.
7648 as_clustering_internal_event hb_event;
7649 memset(&hb_event, 0, sizeof(hb_event));
7650 hb_event.type = AS_CLUSTERING_INTERNAL_EVENT_HB;
7651 hb_event.hb_n_events = n_events;
7652 hb_event.hb_events = hb_node_events;
7653
7654 internal_event_dispatch(&hb_event);
7655}
7656
7657/**
7658 * Reform the cluster with the same succession list.This would trigger the
7659 * generation of new partition info and the cluster would get a new cluster key.
7660 *
7661 * @return 0 if new clustering round started, 1 if not principal, -1 otherwise.
7662 */
7663static int
7664clustering_cluster_reform()
7665{
7666 int rv = -1;
7667 CLUSTERING_LOCK();
7668
7669 cf_vector* dead_nodes = vector_stack_lockless_create(cf_node);
7670 clustering_dead_nodes_find(dead_nodes);
7671
7672 log_cf_node_vector("recluster: dead nodes - ", dead_nodes,
7673 cf_vector_size(dead_nodes) > 0 ? CF_INFO : CF_DEBUG);
7674
7675 cf_vector* faulty_nodes = vector_stack_lockless_create(cf_node);
7676 clustering_faulty_nodes_find(faulty_nodes);
7677
7678 log_cf_node_vector("recluster: faulty nodes - ", faulty_nodes,
7679 cf_vector_size(faulty_nodes) > 0 ? CF_INFO : CF_DEBUG);
7680
7681 cf_vector* new_nodes = vector_stack_lockless_create(cf_node);
7682 clustering_nodes_to_add_get(new_nodes);
7683 log_cf_node_vector("recluster: pending join requests - ", new_nodes,
7684 cf_vector_size(new_nodes) > 0 ? CF_INFO : CF_DEBUG);
7685
7686 if (!clustering_is_running() || !clustering_is_principal()
7687 || cf_vector_size(dead_nodes) > 0
7688 || cf_vector_size(faulty_nodes) > 0
7689 || cf_vector_size(new_nodes) > 0) {
7690 INFO(
7691 "recluster: skipped - principal %s dead_nodes %d faulty_nodes %d new_nodes %d",
7692 clustering_is_principal() ? "true" : "false",
7693 cf_vector_size(dead_nodes), cf_vector_size(faulty_nodes),
7694 cf_vector_size(new_nodes));
7695
7696 if (!clustering_is_principal()) {
7697 // Common case - command will likely be sent to all nodes.
7698 rv = 1;
7699 }
7700
7701 goto Exit;
7702 }
7703
7704 cf_vector* succession_list = vector_stack_lockless_create(cf_node);
7705 vector_copy(succession_list, &g_register.succession_list);
7706
7707 log_cf_node_vector(
7708 "recluster: principal node - reforming new cluster with succession list:",
7709 succession_list, CF_INFO);
7710
7711 as_paxos_start_result result = paxos_proposer_proposal_start(
7712 succession_list, succession_list);
7713
7714 // Log paxos result.
7715 paxos_result_log(result, succession_list);
7716
7717 rv = (result == AS_PAXOS_RESULT_STARTED) ? 0 : -1;
7718
7719 if (rv == -1) {
7720 INFO("recluster: skipped");
7721 }
7722 else {
7723 INFO("recluster: triggered...");
7724 }
7725
7726 cf_vector_destroy(succession_list);
7727
7728Exit:
7729 cf_vector_destroy(dead_nodes);
7730 cf_vector_destroy(faulty_nodes);
7731 cf_vector_destroy(new_nodes);
7732 CLUSTERING_UNLOCK();
7733 return rv;
7734}
7735
7736/**
7737 * Initialize clustering subsystem.
7738 */
7739static void
7740clustering_init()
7741{
7742 if (clustering_is_initialized()) {
7743 return;
7744 }
7745
7746 CLUSTERING_LOCK();
7747 memset(&g_clustering, 0, sizeof(g_clustering));
7748
7749 // Start out as an orphan cluster.
7750 g_clustering.state = AS_CLUSTERING_STATE_ORPHAN;
7751 g_clustering.orphan_state_start_time = cf_getms();
7752
7753 g_clustering.join_request_blackout = cf_shash_create(cf_nodeid_shash_fn,
7754 sizeof(cf_node), sizeof(cf_clock),
7755 AS_CLUSTERING_CLUSTER_MAX_SIZE_SOFT, 0);
7756
7757 vector_lockless_init(&g_clustering.pending_join_requests, cf_node);
7758
7759 // Register as a plugin with the heartbeat subsystem.
7760 as_hb_plugin clustering_plugin;
7761 memset(&clustering_plugin, 0, sizeof(clustering_plugin));
7762
7763 clustering_plugin.id = AS_HB_PLUGIN_CLUSTERING;
7764 // Includes the size for the protocol version, the cluster key, the paxos
7765 // sequence number for current cluster and the preferred principal.
7766 clustering_plugin.wire_size_fixed = sizeof(uint32_t)
7767 + sizeof(as_cluster_key) + sizeof(as_paxos_sequence_number)
7768 + sizeof(cf_node);
7769 // Size of the node in succession list.
7770 clustering_plugin.wire_size_per_node = sizeof(cf_node);
7771 clustering_plugin.set_fn = clustering_hb_plugin_set_fn;
7772 clustering_plugin.parse_fn = clustering_hb_plugin_parse_data_fn;
7773 clustering_plugin.change_listener =
7774 clustering_hb_plugin_data_change_listener;
7775
7776 as_hb_plugin_register(&clustering_plugin);
7777
7778 // Register as hb event listener
7779 as_hb_register_listener(clustering_hb_event_listener, NULL);
7780
7781 // Initialize fabric message pool.
7782 clustering_msg_init();
7783
7784 // Initialize external event publisher.
7785 external_event_publisher_init();
7786
7787 // Initialize the register.
7788 register_init();
7789
7790 // Initialize timer.
7791 timer_init();
7792
7793 // Initialize the quantum interval generator
7794 quantum_interval_generator_init();
7795
7796 // Initialize paxos.
7797 paxos_init();
7798
7799 g_clustering.sys_state = AS_CLUSTERING_SYS_STATE_STOPPED;
7800
7801 DETAIL("clustering module initialized");
7802
7803 CLUSTERING_UNLOCK();
7804}
7805
7806/**
7807 * Start the clustering sub-system.
7808 */
7809static void
7810clustering_start()
7811{
7812 if (clustering_is_running()) {
7813 return;
7814 }
7815
7816 CLUSTERING_LOCK();
7817 g_clustering.sys_state = AS_CLUSTERING_SYS_STATE_RUNNING;
7818 CLUSTERING_UNLOCK();
7819
7820 // Start quantum interval generator.
7821 quantum_interval_generator_start();
7822
7823 // Start the timer.
7824 timer_start();
7825
7826 // Start the external event publisher.
7827 external_event_publisher_start();
7828}
7829
7830/**
7831 * Stop the clustering sub-system.
7832 */
7833static void
7834clustering_stop()
7835{
7836 if (!clustering_is_running()) {
7837 return;
7838 }
7839
7840 CLUSTERING_LOCK();
7841 g_clustering.sys_state = AS_CLUSTERING_SYS_STATE_SHUTTING_DOWN;
7842 CLUSTERING_UNLOCK();
7843
7844 // Stop the timer.
7845 timer_stop();
7846
7847 // Stop the external event publisher.
7848 external_event_publisher_stop();
7849
7850 CLUSTERING_LOCK();
7851 g_clustering.sys_state = AS_CLUSTERING_SYS_STATE_STOPPED;
7852 CLUSTERING_UNLOCK();
7853}
7854
7855/**
7856 * Dump clustering state to logs.
7857 */
7858static void
7859clustering_dump(bool verbose)
7860{
7861 if (!clustering_is_running()) {
7862 INFO("CL: stopped");
7863 return;
7864 }
7865
7866 paxos_proposer_dump(verbose);
7867 paxos_acceptor_dump(verbose);
7868 register_dump(verbose);
7869
7870 CLUSTERING_LOCK();
7871
7872 switch (g_clustering.state) {
7873 case AS_CLUSTERING_STATE_ORPHAN:
7874 INFO("CL: state: orphan");
7875 break;
7876 case AS_CLUSTERING_STATE_PRINCIPAL:
7877 INFO("CL: state: principal");
7878 break;
7879 case AS_CLUSTERING_STATE_NON_PRINCIPAL:
7880 INFO("CL: state: non-principal");
7881 break;
7882 }
7883
7884 INFO("CL: %s",
7885 g_clustering.has_integrity ? "has integrity" : "integrity fault");
7886 cf_node current_principal;
7887 if (clustering_principal_get(&current_principal) != 0) {
7888 if (g_clustering.preferred_principal != current_principal) {
7889 INFO("CL: preferred principal %"PRIx64,
7890 g_clustering.preferred_principal);
7891 }
7892 }
7893
7894 if (g_clustering.state == AS_CLUSTERING_STATE_ORPHAN) {
7895 INFO("CL: join request sent to principal %"PRIx64,
7896 g_clustering.last_join_request_principal);
7897 INFO("CL: join request sent time: %"PRIu64" now: %"PRIu64 ,
7898 g_clustering.last_join_request_sent_time, cf_getms());
7899 }
7900
7901 if (verbose) {
7902 log_cf_node_vector("CL: pending join requests:",
7903 &g_clustering.pending_join_requests, CF_INFO);
7904 }
7905
7906 CLUSTERING_UNLOCK();
7907}
7908
7909/*
7910 * ----------------------------------------------------------------------------
7911 * Internal event dispatcher
7912 * ----------------------------------------------------------------------------
7913 */
7914
7915/**
7916 * Simple dispatcher for events. The order of dispatch is from lower (less
7917 * dependent) to higher (more dependent) sub-modules.
7918 */
7919static void
7920internal_event_dispatch(as_clustering_internal_event* event)
7921{
7922 // Sub-module dispatch.
7923 quantum_interval_generator_event_dispatch(event);
7924 paxos_event_dispatch(event);
7925 register_event_dispatch(event);
7926
7927 // Dispatch to the main clustering module.
7928 clustering_event_handle(event);
7929}
7930
7931/*
7932 * ----------------------------------------------------------------------------
7933 * Public API.
7934 * ----------------------------------------------------------------------------
7935 */
7936
7937/**
7938 *
7939 * Initialize clustering subsystem.
7940 */
7941void
7942as_clustering_init()
7943{
7944 clustering_init();
7945}
7946
7947/**
7948 * Start clustering subsystem.
7949 */
7950void
7951as_clustering_start()
7952{
7953 clustering_start();
7954}
7955
7956/**
7957 * Stop clustering subsystem.
7958 */
7959void
7960as_clustering_stop()
7961{
7962 clustering_stop();
7963}
7964
7965/**
7966 * Reform the cluster with the same succession list.This would trigger the
7967 * generation of new partition info and the cluster would get a new cluster key.
7968 *
7969 * @return 0 if new clustering round started, -1 otherwise.
7970 */
7971int
7972as_clustering_cluster_reform()
7973{
7974 return clustering_cluster_reform();
7975}
7976
7977/**
7978 * Return the quantum interval, i.e., the interval at which cluster change
7979 * decisions are taken. The unit is milliseconds.
7980 */
7981uint64_t
7982as_clustering_quantum_interval()
7983{
7984 return quantum_interval();
7985}
7986
7987/**
7988 * TEMPORARY - used by paxos only.
7989 */
7990void
7991as_clustering_set_integrity(bool has_integrity)
7992{
7993 g_clustering.has_integrity = has_integrity;
7994}
7995
7996/*
7997 * ----------------------------------------------------------------------------
7998 * Clustering info command functions.
7999 * ----------------------------------------------------------------------------
8000 */
8001
8002/**
8003 * If false means than either this node is orphaned, or is undergoing a cluster
8004 * change.
8005 */
8006bool
8007as_clustering_has_integrity()
8008{
8009 return g_clustering.has_integrity;
8010}
8011
8012/**
8013 * Indicates if self node is orphaned.
8014 */
8015bool
8016as_clustering_is_orphan()
8017{
8018 return clustering_is_orphan();
8019}
8020
8021/**
8022 * Dump clustering state to the log.
8023 */
8024void
8025as_clustering_dump(bool verbose)
8026{
8027 clustering_dump(verbose);
8028}
8029
8030/**
8031 * Set the min cluster size.
8032 */
8033int
8034as_clustering_cluster_size_min_set(uint32_t new_cluster_size_min)
8035{
8036 CLUSTERING_LOCK();
8037 int rv = 0;
8038 uint32_t cluster_size = cf_vector_size(&g_register.succession_list);
8039 if (clustering_is_orphan() || cluster_size >= new_cluster_size_min) {
8040 INFO("changing value of min-cluster-size from %u to %u",
8041 g_config.clustering_config.cluster_size_min,
8042 new_cluster_size_min);
8043 g_config.clustering_config.cluster_size_min = new_cluster_size_min;
8044 }
8045 else {
8046 WARNING(
8047 "min-cluster-size %d should be <= current cluster size %d - ignoring",
8048 new_cluster_size_min, cluster_size);
8049 rv = -1;
8050 }
8051 CLUSTERING_UNLOCK();
8052 return rv;
8053}
8054
8055/**
8056 * Log a vector of node-ids at input severity spliting long vectors over
8057 * multiple lines. The call might not work if the vector is not protected
8058 * against multi-threaded access.
8059 *
8060 * @param context the logging context.
8061 * @param severity the log severity.
8062 * @param file_name the source file name for the log line.
8063 * @param line the source file line number for the log line.
8064 * @param message the message prefix for each log line. Message and node list
8065 * will be separated with a space. Can be NULL for no prefix.
8066 * @param nodes the vector of nodes.
8067 */
8068void
8069as_clustering_cf_node_vector_event(cf_fault_severity severity,
8070 cf_fault_context context, char* file_name, int line, char* message,
8071 cf_vector* nodes)
8072{
8073 as_clustering_cf_node_array_event(severity, context, file_name, line,
8074 message, vector_to_array(nodes), cf_vector_size(nodes));
8075}
8076
8077/**
8078 * Log an array of node-ids at input severity spliting long vectors over
8079 * multiple lines. The call might not work if the array is not protected against
8080 * multi-threaded access.
8081 *
8082 * @param context the logging context.
8083 * @param severity the log severity.
8084 * @param file_name the source file name for the log line.
8085 * @param line the source file line number for the log line.
8086 * @param message the message prefix for each log line. Message and node list
8087 * will be separated with a space. Can be NULL for no prefix.
8088 * @param nodes the array of nodes.
8089 * @param node_count the count of nodes in the array.
8090 */
8091void
8092as_clustering_cf_node_array_event(cf_fault_severity severity,
8093 cf_fault_context context, char* file_name, int line, char* message,
8094 cf_node* nodes, int node_count)
8095{
8096 if (!cf_context_at_severity(context, severity) && severity != CF_DETAIL) {
8097 return;
8098 }
8099
8100 // Also account the space following the nodeid.
8101 int node_str_len = 2 * (sizeof(cf_node)) + 1;
8102
8103 int message_length = 0;
8104 char copied_message[LOG_LENGTH_MAX()];
8105
8106 if (message) {
8107 // Limit the message length to allow at least one node to fit in the log
8108 // line. Accounting for the separator between message and node list.
8109 message_length = MIN(strnlen(message, LOG_LENGTH_MAX() - 1),
8110 LOG_LENGTH_MAX() - 1 - node_str_len) + 1;
8111
8112 // Truncate the message.
8113 strncpy(copied_message, message, message_length);
8114 message = copied_message;
8115 }
8116
8117 // Allow for the NULL terminator.
8118 int nodes_per_line = (LOG_LENGTH_MAX() - message_length - 1) / node_str_len;
8119 nodes_per_line = MAX(1, nodes_per_line);
8120
8121 // Have a buffer large enough to accomodate the message and nodes per line.
8122 char log_buffer[message_length + (nodes_per_line * node_str_len) + 1]; // For the NULL terminator.
8123 int output_node_count = 0;
8124
8125 // Marks the start of the nodeid list in the log line buffer.
8126 char* node_buffer_start = log_buffer;
8127 if (message) {
8128 node_buffer_start += sprintf(log_buffer, "%s ", message);
8129 }
8130
8131 for (int i = 0; i < node_count;) {
8132 char* buffer = node_buffer_start;
8133
8134 for (int j = 0; j < nodes_per_line && i < node_count; j++) {
8135 buffer += sprintf(buffer, "%"PRIx64" ", nodes[i]);
8136 output_node_count++;
8137 i++;
8138 }
8139
8140 // Overwrite the space from the last node on the log line only if there
8141 // is atleast one node output
8142 if (buffer != node_buffer_start) {
8143 *(buffer - 1) = 0;
8144 cf_fault_event(context, severity, file_name, line, "%s",
8145 log_buffer);
8146 }
8147 }
8148
8149 // Handle the empty vector case.
8150 if (output_node_count == 0) {
8151 sprintf(node_buffer_start, "(empty)");
8152 cf_fault_event(context, severity, file_name, line, "%s", log_buffer);
8153 }
8154}
8155