1/*
2 * hb.h
3 *
4 * Copyright (C) 2008-2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23#pragma once
24
25#include <stdbool.h>
26#include <stdint.h>
27
28#include "citrusleaf/cf_atomic.h"
29#include "citrusleaf/cf_vector.h"
30
31#include "msg.h"
32#include "socket.h"
33#include "tls.h"
34
35#include "fabric/hlc.h"
36
37/**
38 * Maximum number of nodes in a cluster.
39 */
40#ifndef AS_CLUSTER_SZ
41#define AS_CLUSTER_SZ 8
42#endif
43
44/**
45 * Minimum heartbeat interval.
46 */
47#define AS_HB_TX_INTERVAL_MS_MIN 50
48
49/**
50 * Maximum heartbeat interval. (10 mins)
51 */
52#define AS_HB_TX_INTERVAL_MS_MAX 600000
53
54/**
55 * Minimum max-intervals-missed.
56 */
57#define AS_HB_MAX_INTERVALS_MISSED_MIN 3
58
59/**
60 * Heartbeat modes.
61 */
62typedef enum as_hb_mode_enum
63{
64 AS_HB_MODE_UNDEF,
65 AS_HB_MODE_MULTICAST,
66 AS_HB_MODE_MESH
67} as_hb_mode;
68
69/**
70 * Heartbeat protocol versions.
71 */
72typedef enum as_hb_protocol_enum
73{
74 AS_HB_PROTOCOL_UNDEF,
75 AS_HB_PROTOCOL_NONE,
76 AS_HB_PROTOCOL_RESET,
77 AS_HB_PROTOCOL_V3
78} as_hb_protocol;
79
80/**
81 * Events published by the heartbeat subsystem.
82 */
83typedef enum
84{
85 AS_HB_NODE_ARRIVE,
86 AS_HB_NODE_DEPART,
87 AS_HB_NODE_ADJACENCY_CHANGED,
88 AS_HB_NODE_EVENT_SENTINEL
89} as_hb_event_type;
90
91/**
92 * A plugin that is publishing and receiving data via the heartbeat subsystem.
93 * The heartbeat outgoing message buffer will be populated and parsed in the
94 * order of this enum.
95 */
96typedef enum
97{
98 /**
99 * The heartbeat subsystem itself.
100 */
101 AS_HB_PLUGIN_HB,
102 /**
103 * The older clustering subsystem.
104 * TODO: Use only one plugin id and register differently based on the
105 * clustering version.
106 */
107 AS_HB_PLUGIN_FABRIC,
108 /**
109 * The clustering subsystem.
110 */
111 AS_HB_PLUGIN_CLUSTERING,
112 /**
113 * The skew monitor.
114 */
115 AS_HB_PLUGIN_SKEW_MONITOR,
116 /**
117 * Dummy sentinel enum value. Should be the last.
118 */
119 AS_HB_PLUGIN_SENTINEL
120} as_hb_plugin_id;
121
122/**
123 * The fields in the heartbeat message.
124 * New field additions only at the end.
125 */
126typedef enum
127{
128 /**
129 * HB protocol identifier.
130 */
131 AS_HB_MSG_ID,
132
133 /**
134 * HB subsystem message type.
135 */
136 AS_HB_MSG_TYPE,
137
138 /**
139 * HB message source.
140 */
141 AS_HB_MSG_NODE,
142
143 /**
144 * Cluster Name.
145 */
146 AS_HB_MSG_CLUSTER_NAME,
147
148 /**
149 * HLC timestamp.
150 */
151 AS_HB_MSG_HLC_TIMESTAMP,
152
153 /**
154 * Heartbeats endpoints advertised by this node.
155 */
156 AS_HB_MSG_ENDPOINTS,
157
158 /**
159 * Payload for compressed messages.
160 */
161 AS_HB_MSG_COMPRESSED_PAYLOAD,
162
163 /**
164 * Mesh info request.
165 */
166 AS_HB_MSG_INFO_REQUEST,
167
168 /**
169 * Mesh info reply.
170 */
171 AS_HB_MSG_INFO_REPLY,
172
173 /*
174 * ---- Plugin data fields. Potentially extensible ----
175 */
176 /**
177 * Fabric data advertised by this node. Placed close to hb endpoints to
178 * help compression, because it would most likely match with hb endpoints.
179 */
180 AS_HB_MSG_FABRIC_DATA,
181
182 /**
183 * Valid only for pulse messages, has adjacency list and clusterid.
184 */
185 AS_HB_MSG_HB_DATA,
186
187 /**
188 * Contains the cluster key and succession list.
189 */
190 AS_HB_MSG_PAXOS_DATA,
191
192 /**
193 * Local physical clock monotonic timestamp for when the message was sent.
194 */
195 AS_HB_MSG_SKEW_MONITOR_DATA
196} as_hb_msg_fields;
197
198/**
199 * Heartbeat subsystem configuration.
200 */
201typedef struct as_hb_config_s
202{
203 /**
204 * Mode of operation. Mesh or Multicast for now.
205 */
206 as_hb_mode mode;
207
208 /**
209 * Binding interface config.
210 */
211 cf_serv_cfg bind_cfg;
212
213 /**
214 * Global TLS configuration.
215 */
216
217 cf_tls_info *tls;
218
219 /**
220 * Multicast mode only config for multicast groups.
221 */
222 cf_mserv_cfg multicast_group_cfg;
223
224 /**
225 * The interval at which heartbeat pulse messages are sent in milliseconds.
226 */
227 uint32_t tx_interval;
228
229 /**
230 * Max number of missed heartbeat intervals after which a node is considered
231 * expired.
232 */
233 uint32_t max_intervals_missed;
234
235 /**
236 * The ttl for multicast packets. Set to zero for default TTL.
237 */
238 uint8_t multicast_ttl;
239
240 /**
241 * HB protocol to use.
242 */
243 as_hb_protocol protocol;
244
245 /**
246 * Set to a value > 0 to override the MTU read from the network interface.
247 */
248 uint32_t override_mtu;
249
250 /**
251 * Mesh seeds from config file.
252 * Only used for during config parsing and initialization.
253 */
254 char* mesh_seed_addrs[AS_CLUSTER_SZ];
255 int mesh_seed_ports[AS_CLUSTER_SZ];
256 bool mesh_seed_tls[AS_CLUSTER_SZ];
257
258} as_hb_config;
259
260/**
261 * Heartbeat published event structure.
262 */
263typedef struct as_hb_event_node_s
264{
265 /**
266 * The type of the event.
267 */
268 as_hb_event_type evt;
269
270 /**
271 * The event nodeid.
272 */
273 cf_node nodeid;
274
275 /**
276 * The monotonic timestamp when this event happened.
277 */
278 cf_clock event_time;
279
280 /**
281 * The monotonic timestamp when this event was detected. Will differ from
282 * event_time for node depart events.
283 */
284 cf_clock event_detected_time;
285} as_hb_event_node;
286
287/**
288 * A hook to allow plugin to publish its data as a part of the heartbeat
289 * message.
290 */
291typedef void (*as_hb_plugin_set_data_fn)(msg* hb_message);
292
293/**
294 * Data stored for an adjacent node for a plugin.
295 */
296typedef struct as_hb_plugin_node_data_s
297{
298 /**
299 * Heap allocated node specific data blob for this plugin.
300 */
301 void* data;
302
303 /**
304 * The size of the stored data.
305 */
306 size_t data_size;
307
308 /**
309 * The capacity of the allocated data structure.
310 */
311 size_t data_capacity;
312} as_hb_plugin_node_data;
313
314/**
315 * A function to parse plugin data for a node into an in memory object. Should
316 * be fast and never acquire locks.
317 *
318 * The parameter plugin_data->data will always point to a pre-allocated memory
319 * location. plugin_data->data_capacity will indicate the capacity of this
320 * memory. Implementations should reuse this pre-allocated data blob to avoid
321 * the overhead of heap allocations. If current data capacity is greater than
322 * the new data size please invoke cf_realloc and get a new block for current
323 * data and update plugin_data->data and plugin_data->data_capacity accordingly.
324 *
325 * This function should always update data_size correctly before returning. Set
326 * plugin_data->data_size = 0 for no plugin data.
327 *
328 * @param hb_message the heartbeat message.
329 * @param source the source node.
330 * @param plugin_data_prev plugin data structure from the previous heartbeat to
331 * be used to accumulate historical data.
332 * Field plugin_data_prev->data_size will be zero if this the first heartbeat
333 * from the source.
334 * @param plugin_data (output) plugin data structure to output parsed data.
335 */
336typedef void (*as_hb_plugin_parse_data_fn)(msg* hb_message, cf_node source, as_hb_plugin_node_data* plugin_data_prev, as_hb_plugin_node_data* plugin_data);
337
338/**
339 * A listener for detecting changes to this plugin's data for a particular node.
340 * Does not supply old and new values of the data, because does not seem to be
341 * required currently and to keep implementation simple.
342 *
343 * @param node the node whose plugin data changed.
344 */
345typedef void (*as_hb_plugin_data_changed_fn)(cf_node nodeid);
346
347/**
348 * A plugin allows a module to pushing and read data with heartbeat pulse
349 * messages.
350 */
351typedef struct as_hb_plugin_s
352{
353 /**
354 * The plugin id.
355 */
356 as_hb_plugin_id id;
357
358 /**
359 * Fixed plugin data size on wire.
360 */
361 size_t wire_size_fixed;
362
363 /**
364 * Additional plugin data size on wire per node in the adjacency list.
365 */
366 size_t wire_size_per_node;
367
368 /**
369 * The function which adds this plugin's data to the pulse message. Can be
370 * NULL. This function can hold the plugin module's locks.
371 */
372 as_hb_plugin_set_data_fn set_fn;
373
374 /**
375 * A function will parses and reads this plugins data from an incoming
376 * message. Can be NULL. This function SHOULD NOT hold the plugin module's
377 * locks to prevent deadlocks.
378 */
379 as_hb_plugin_parse_data_fn parse_fn;
380
381 /**
382 * A function invoked when plugin data for a particular node changed.
383 * Can be NULL. This function can hold the plugin module's locks.
384 */
385 as_hb_plugin_data_changed_fn change_listener;
386} as_hb_plugin;
387
388/*
389 * -----------------------------------------------------------------
390 * HB subsystem public API
391 * -----------------------------------------------------------------
392 */
393
394void as_hb_init();
395
396void as_hb_start();
397
398void as_hb_shutdown();
399
400bool as_hb_self_is_duplicate();
401
402bool as_hb_node_is_adjacent(cf_node nodeid);
403
404typedef void (*as_hb_event_fn)(int nevents, as_hb_event_node* events, void* udata);
405
406void as_hb_register_listener(as_hb_event_fn event_callback, void* udata);
407
408void as_hb_dump(bool verbose);
409
410as_hb_protocol as_hb_protocol_get();
411
412int as_hb_protocol_set(as_hb_protocol protocol);
413
414uint32_t as_hb_node_timeout_get();
415
416void as_hb_override_mtu_set(int mtu);
417
418uint32_t as_hb_tx_interval_get();
419
420int as_hb_tx_interval_set(uint32_t new_interval);
421
422uint32_t as_hb_max_intervals_missed_get();
423
424int as_hb_max_intervals_missed_set(uint32_t new_max);
425
426uint32_t as_hb_node_timeout_get();
427
428bool as_hb_max_cluster_size_isvalid(uint32_t max_cluster_size);
429
430/*
431 * -----------------------------------------------------------------
432 * HB plugin subsystem public API.
433 * -----------------------------------------------------------------
434 */
435
436void as_hb_plugin_register(as_hb_plugin* plugin);
437
438bool as_hb_is_alive(cf_node nodeid);
439
440void as_hb_config_validate();
441
442void as_hb_maximal_clique_evict(cf_vector* nodes, cf_vector* nodes_to_evict);
443
444int as_hb_plugin_data_get(cf_node nodeid, as_hb_plugin_id plugin, as_hb_plugin_node_data* plugin_data, as_hlc_msg_timestamp* msg_hlc_ts, cf_clock* recv_monotonic_ts);
445
446typedef void (*as_hb_plugin_data_iterate_fn)(cf_node nodeid, void* plugin_data, size_t plugin_data_size, cf_clock recv_monotonic_ts, as_hlc_msg_timestamp* msg_hlc_ts, void* udata);
447
448void as_hb_plugin_data_iterate(cf_vector* nodes, as_hb_plugin_id plugin, as_hb_plugin_data_iterate_fn iterate_fn, void* udata);
449
450void as_hb_plugin_data_iterate_all(as_hb_plugin_id plugin, as_hb_plugin_data_iterate_fn iterate_fn, void* udata);
451
452/*
453 * -----------------------------------------------------------------
454 * Info public API
455 * -----------------------------------------------------------------
456 */
457
458void as_hb_info_config_get(cf_dyn_buf* db);
459
460void as_hb_info_endpoints_get(cf_dyn_buf* db);
461
462void as_hb_info_listen_addr_get(as_hb_mode* mode, char* addr_port, size_t addr_port_capacity);
463
464void as_hb_info_duplicates_get(cf_dyn_buf* db);
465
466/*
467 * -----------------------------------------------------------------
468 * Mesh mode public API
469 * -----------------------------------------------------------------
470 */
471
472int as_hb_mesh_tip(char* host, int port, bool tls);
473
474int as_hb_mesh_tip_clear(char* host, int port);
475
476int as_hb_mesh_tip_clear_all(uint32_t* cleared);
477
478void as_hb_config_validate();
479