1/*
2 * smd.c
3 *
4 * Copyright (C) 2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "base/smd.h"
28
29#include <errno.h>
30#include <stdbool.h>
31#include <stddef.h>
32#include <stdint.h>
33#include <sys/stat.h>
34#include <unistd.h>
35
36#include "jansson.h"
37
38#include "citrusleaf/alloc.h"
39#include "citrusleaf/cf_hash_math.h"
40#include "citrusleaf/cf_queue.h"
41#include "citrusleaf/cf_vector.h"
42
43#include "bits.h"
44#include "cf_mutex.h"
45#include "cf_thread.h"
46#include "dynbuf.h"
47#include "fault.h"
48#include "msg.h"
49#include "node.h"
50#include "shash.h"
51
52#include "base/cfg.h"
53#include "fabric/exchange.h"
54#include "fabric/fabric.h"
55#include "fabric/hb.h"
56
57#include "warnings.h"
58
59
60//==========================================================
61// Typedefs & constants.
62//
63
64// These values are used on the wire - don't change them.
65typedef enum {
66 SMD_MSG_TID,
67 SMD_MSG_VERSION,
68 SMD_MSG_CLUSTER_KEY,
69 SMD_MSG_OP,
70 SMD_MSG_MODULE_ID,
71 SMD_MSG_UNUSED_5, // used to be SMD_MSG_ACTION
72 SMD_MSG_UNUSED_6, // used to be SMD_MSG_MODULE
73 SMD_MSG_UNUSED_7, // used to be SMD_MSG_KEY
74 SMD_MSG_UNUSED_8, // used to be SMD_MSG_VALUE
75 SMD_MSG_UNUSED_9, // used to be SMD_MSG_GEN_ARRAY
76 SMD_MSG_TS_ARRAY,
77 SMD_MSG_UNUSED_11, // used to be SMD_MSG_MODULE_NAME
78 SMD_MSG_UNUSED_12, // used to be SMD_MSG_OPTIONS
79
80 SMD_MSG_VERSION_LIST,
81 SMD_MSG_UNUSED_14, // used to be SMD_MSG_MODULE_COUNTS
82 SMD_MSG_KEY_LIST,
83 SMD_MSG_VALUE_LIST,
84 SMD_MSG_GEN_LIST,
85
86 SMD_MSG_SINGLE_KEY,
87 SMD_MSG_SINGLE_VALUE,
88 SMD_MSG_SINGLE_GENERATION,
89 SMD_MSG_SINGLE_TIMESTAMP,
90
91 SMD_MSG_COMMITTED_CL_KEY,
92
93 NUM_SMD_FIELDS
94} smd_msg_fields;
95
96static const msg_template smd_mt[] = {
97 { SMD_MSG_TID, M_FT_UINT64 },
98 { SMD_MSG_VERSION, M_FT_UINT32 },
99 { SMD_MSG_CLUSTER_KEY, M_FT_UINT64 },
100 { SMD_MSG_OP, M_FT_UINT32 },
101 { SMD_MSG_MODULE_ID, M_FT_UINT32 },
102 { SMD_MSG_UNUSED_5, M_FT_ARRAY_UINT32 },
103 { SMD_MSG_UNUSED_6, M_FT_ARRAY_STR },
104 { SMD_MSG_UNUSED_7, M_FT_ARRAY_STR },
105 { SMD_MSG_UNUSED_8, M_FT_ARRAY_STR },
106 { SMD_MSG_UNUSED_9, M_FT_ARRAY_UINT32 },
107 { SMD_MSG_TS_ARRAY, M_FT_ARRAY_UINT64 },
108 { SMD_MSG_UNUSED_11, M_FT_STR },
109 { SMD_MSG_UNUSED_12, M_FT_UINT32 },
110
111 { SMD_MSG_VERSION_LIST, M_FT_MSGPACK },
112 { SMD_MSG_UNUSED_14, M_FT_MSGPACK },
113 { SMD_MSG_KEY_LIST, M_FT_MSGPACK },
114 { SMD_MSG_VALUE_LIST, M_FT_MSGPACK },
115 { SMD_MSG_GEN_LIST, M_FT_MSGPACK },
116
117 { SMD_MSG_SINGLE_KEY, M_FT_STR },
118 { SMD_MSG_SINGLE_VALUE, M_FT_STR },
119 { SMD_MSG_SINGLE_GENERATION, M_FT_UINT32 },
120 { SMD_MSG_SINGLE_TIMESTAMP, M_FT_UINT64 },
121
122 { SMD_MSG_COMMITTED_CL_KEY, M_FT_UINT64 }
123};
124
125COMPILER_ASSERT(sizeof(smd_mt) / sizeof(msg_template) == NUM_SMD_FIELDS);
126
127#define SMD_MSG_SCRATCH_SIZE 64 // TODO - rethink... could be smaller?
128
129typedef enum {
130 // These values are used on the wire - don't change them.
131 SMD_OP_SET_TO_PR = 0,
132 SMD_OP_REPORT_ALL_VERS_TO_PR,
133 SMD_OP_REPORT_VER_TO_PR,
134 SMD_OP_FULL_TO_PR,
135 SMD_OP_ACK_TO_PR,
136
137 SMD_OP_SET_FROM_PR,
138 SMD_OP_REQ_VER_FROM_PR,
139 SMD_OP_FULL_FROM_PR,
140 SMD_OP_REQ_FULL_FROM_PR,
141
142 SMD_OP_SET_ACK,
143 SMD_OP_SET_NACK,
144
145 // Must be last - these are internal ops and don't go on the wire.
146 SMD_OP_CLUSTER_CHANGED,
147 SMD_OP_START_SET,
148
149 NUM_SMD_OP_TYPES
150} smd_op_type;
151
152static const char* const op_type_str[] = {
153 [SMD_OP_SET_TO_PR] = "set-to-pr",
154 [SMD_OP_REPORT_ALL_VERS_TO_PR] = "report-all-vers-to-pr",
155 [SMD_OP_REPORT_VER_TO_PR] = "report-ver-to-pr",
156 [SMD_OP_FULL_TO_PR] = "full-to-pr",
157 [SMD_OP_ACK_TO_PR] = "ack-to-pr",
158
159 [SMD_OP_SET_FROM_PR] = "set-from-pr",
160 [SMD_OP_REQ_VER_FROM_PR] = "req-ver-from-pr",
161 [SMD_OP_FULL_FROM_PR] = "full-from-pr",
162 [SMD_OP_REQ_FULL_FROM_PR] = "req-full-from-pr",
163
164 [SMD_OP_SET_ACK] = "set-ack",
165 [SMD_OP_SET_NACK] = "set-nack",
166
167 [SMD_OP_CLUSTER_CHANGED] = "cluster-changed",
168 [SMD_OP_START_SET] = "start-set"
169};
170
171COMPILER_ASSERT(sizeof(op_type_str) / sizeof(const char*) == NUM_SMD_OP_TYPES);
172
173typedef enum {
174 STATE_PR = 0,
175 STATE_NPR,
176 STATE_MERGING,
177 STATE_DIRTY,
178 STATE_CLEAN,
179 STATE_SET,
180
181 NUM_SMD_STATES
182} smd_state;
183
184static const char* const state_str[] = {
185 [STATE_PR] = "pr",
186 [STATE_NPR] = "npr",
187 [STATE_MERGING] = "merging",
188 [STATE_DIRTY] = "dirty",
189 [STATE_CLEAN] = "clean",
190 [STATE_SET] = "set"
191};
192
193COMPILER_ASSERT(sizeof(state_str) / sizeof(const char*) == NUM_SMD_STATES);
194
195typedef struct smd_s {
196 uint64_t cl_key;
197 uint32_t node_count;
198 cf_node succession[AS_CLUSTER_SZ]; // descending order
199
200 cf_queue pending_set_q; // elements are (smd_op*)
201 cf_queue event_q; // elements are (smd_op*)
202
203 cf_mutex lock;
204
205 uint32_t set_tid;
206 cf_shash* set_h;
207} smd;
208
209typedef struct smd_hash_ele_s {
210 struct smd_hash_ele_s* next;
211 const char* key;
212 uint32_t value;
213} smd_hash_ele;
214
215#define N_HASH_ROWS 256
216
217typedef struct smd_hash_s {
218 smd_hash_ele table[N_HASH_ROWS];
219} smd_hash;
220
221typedef struct smd_module_s {
222 as_smd_id id;
223 const char* name;
224 uint64_t cv_key;
225 uint64_t cv_tid;
226
227 as_smd_accept_fn accept_cb;
228 as_smd_conflict_fn conflict_cb;
229
230 smd_hash db_h; // key is (char*), value is uint32_t
231 cf_vector db;
232
233 bool in_use; // EE modules may not be in use
234
235 // For principal.
236 uint64_t retry_next_ms;
237 uint32_t retry_msg_count;
238 msg* retry_msgs[AS_CLUSTER_SZ];
239
240 uint64_t merge_tids[AS_CLUSTER_SZ];
241 smd_hash merge_h;
242 cf_vector merge;
243
244 smd_state state;
245
246 // For set ack/nack.
247 cf_node set_src;
248 uint64_t set_key;
249 uint64_t set_tid;
250} smd_module;
251
252typedef struct smd_op_s {
253 smd_op_type type;
254
255 cf_node src;
256 uint32_t node_index;
257
258 smd_module* module;
259
260 uint64_t cl_key;
261 uint64_t committed_key;
262
263 uint64_t tid;
264
265 cf_vector items;
266
267 // For cluster changed events.
268 uint32_t node_count;
269 cf_node* succession;
270
271 // For report all versions events.
272 uint32_t version_count;
273 uint64_t* version_list;
274
275 // For originator of set operations.
276 as_smd_set_fn set_cb;
277 void* set_udata;
278 uint64_t set_timeout;
279} smd_op;
280
281typedef struct smd_set_entry_s {
282 uint64_t cl_key;
283
284 as_smd_set_fn cb;
285 void* udata;
286 uint64_t deadline_ms;
287
288 uint64_t retry_next_ms;
289 as_smd_item* item;
290 smd_module* module;
291} smd_set_entry;
292
293typedef struct set_orig_reduce_udata_s {
294 int wait_ms;
295 uint64_t now_ms;
296} set_orig_reduce_udata;
297
298#define NUM_FUTURE_MODULES 10 // hopefully won't add more than this
299
300#define MODULE_NAME_MAX_LEN 10
301#define STATE_NAME_MAX_LEN 10
302
303typedef struct smd_module_string_s {
304 // To represent "%s:%s:%lx-%lu".
305 char s[MODULE_NAME_MAX_LEN + 1 + STATE_NAME_MAX_LEN + 1 + 16 + 1 + 20 + 1];
306} smd_module_string;
307
308static const char smd_empty_value[] = "";
309
310#define REPORT_VER_DELAY_US 50000 // 50 milliseconds
311#define SMD_RETRY_MS 3000 // 3 seconds
312
313#define DEFAULT_SET_TIMEOUT_MS 2000 // 2 seconds
314#define SET_RETRY_MS 100
315
316#define MAX_PATH_LEN 1024
317
318
319//==========================================================
320// Globals.
321//
322
323static smd g_smd = { .lock = CF_MUTEX_INIT };
324
325// In alpha order.
326static smd_module g_module_table[] = {
327 [AS_SMD_MODULE_EVICT] = { .name = "evict" },
328 [AS_SMD_MODULE_ROSTER] = { .name = "roster" },
329 [AS_SMD_MODULE_SECURITY] = { .name = "security" },
330 [AS_SMD_MODULE_SINDEX] = { .name = "sindex" },
331 [AS_SMD_MODULE_TRUNCATE] = { .name = "truncate" },
332 [AS_SMD_MODULE_UDF] = { .name = "UDF" }
333};
334
335COMPILER_ASSERT(sizeof(g_module_table) / sizeof(smd_module) ==
336 AS_SMD_NUM_MODULES);
337
338
339//==========================================================
340// Forward declarations.
341//
342
343// Callbacks.
344static int smd_msg_recv_cb(cf_node node_id, msg* m, void* udata);
345static void smd_cluster_changed_cb(const as_exchange_cluster_changed_event* ex_event, void* udata);
346static void smd_set_blocking_cb(bool result, void* udata);
347
348// Parse fabric msg.
349static bool smd_msg_parse(msg* m, smd_op* op);
350static bool smd_msg_parse_items(msg* m, smd_op* op);
351
352// Event loop.
353static void* run_smd(void*);
354static int pr_try_retransmit(void);
355static int set_orig_try_retransmit_or_expire(void);
356static int set_orig_reduce_cb(const void* key, void* value, void* udata);
357static void smd_event(smd_op* op);
358
359// Events.
360static void op_cluster_changed(smd_op* op);
361static void op_start_set(smd_op* op);
362
363static void op_set_to_pr(smd_op* op);
364static void op_report_all_vers_to_pr(smd_op* op);
365static void op_report_ver_to_pr(smd_op* op);
366static void op_full_to_pr(smd_op* op);
367static void op_ack_to_pr(smd_op* op);
368
369static void op_set_from_pr(smd_op* op);
370static void op_req_ver_from_pr(smd_op* op);
371static void op_full_from_pr(smd_op* op);
372static void op_req_full_from_pr(smd_op* op);
373static void op_finish_set(smd_op* op, bool success);
374
375// Pending set queue.
376static bool pending_set_q_contains(const smd_op* op);
377static int pending_set_q_reduce_cb(void* ptr, void* udata);
378
379// Fabric msg send/reply.
380static void send_set_from_pr(smd_module* module, const as_smd_item* item);
381static void send_full_from_pr(smd_module* module);
382static void send_report_all_ver_to_pr(void);
383static void send_report_ver_to_pr(smd_module* module);
384static void send_ack_to_pr(smd_op* op);
385static void send_set_reply(smd_module* module, bool success);
386static void send_set_from_orig(uint32_t set_tid, smd_set_entry* entry);
387
388// Fabric msg retransmit.
389static void pr_send_msgs(smd_module* module);
390static void pr_set_retry_msg(smd_module* module, msg* m);
391static bool pr_mark_reply(smd_op* op, smd_state state);
392static void pr_clear_retry_msgs(smd_module* module);
393
394// Call module accept_cb.
395static void module_accept_item(smd_module* module, const as_smd_item* item);
396static void module_accept_list(smd_module* module, const cf_vector* list);
397static void module_accept_startup(smd_module* module);
398
399// Module.
400static void module_regen_key2index(smd_module* module);
401static void module_append_item(smd_module* module, as_smd_item* item);
402static void module_fill_msg(smd_module* module, msg* m);
403static void module_merge_list(smd_module* module, cf_vector* list);
404static void module_set_npr(smd_module* module, as_smd_item* item);
405static bool module_set_pr(smd_module* module, char* key, char* value);
406static void module_restore_from_disk(smd_module* module);
407static void module_commit_to_disk(smd_module* module);
408static void module_set_default_items(smd_module* module, const cf_vector* default_items);
409
410// Hash.
411static void smd_hash_init(smd_hash* h);
412static void smd_hash_clear(smd_hash* h);
413static void smd_hash_put(smd_hash* h, const char* key, uint32_t value);
414static bool smd_hash_get(const smd_hash* h, const char* key, uint32_t* value);
415static uint32_t smd_hash_get_row_i(const char* key);
416
417// as_smd_item.
418static as_smd_item* smd_item_create_copy(const char* key, const char* value, uint64_t ts, uint32_t gen);
419static as_smd_item* smd_item_create_handoff(char* key, char* value, uint64_t ts, uint32_t gen);
420static bool smd_item_is_less(const as_smd_item* item0, const as_smd_item* item1);
421static void smd_item_destroy(as_smd_item* item);
422
423static char* smd_item_value_ndup(uint8_t* value, uint32_t sz);
424static char* smd_item_value_dup(const char* value);
425static void smd_item_value_destroy(char* value);
426
427
428//==========================================================
429// Inlines & macros.
430//
431
432#define JSON_ENFORCE(x) { \
433 if ((x) != 0) { \
434 cf_crash(AS_SMD, "json alloc error"); \
435 } \
436}
437
438static inline smd_module*
439smd_get_module(as_smd_id id)
440{
441 cf_assert(id < AS_SMD_NUM_MODULES, AS_SMD, "invalid id %d", id);
442 return &g_module_table[id];
443}
444
445static inline bool
446smd_is_pr(void)
447{
448 return g_smd.succession[0] == g_config.self_node;
449}
450
451static inline void
452smd_lock()
453{
454 cf_mutex_lock(&g_smd.lock);
455}
456
457static inline void
458smd_unlock()
459{
460 cf_mutex_unlock(&g_smd.lock);
461}
462
463static inline void
464smd_set_entry_destroy(smd_set_entry* entry)
465{
466 if (entry != NULL) {
467 smd_item_destroy(entry->item);
468 cf_free(entry);
469 }
470}
471
472#define item_vec_define(_x, _cnt) \
473 cf_vector_define(_x, sizeof(as_smd_item*), _cnt, 0);
474
475static inline void
476item_vec_init(cf_vector* vec, uint32_t count)
477{
478 cf_vector_init(vec, sizeof(as_smd_item*), count, 0);
479}
480
481static inline const as_smd_item*
482item_vec_get_const(const cf_vector* vec, uint32_t i)
483{
484 return (const as_smd_item*)cf_vector_get_ptr(vec, i);
485}
486
487static inline as_smd_item*
488item_vec_get(cf_vector* vec, uint32_t i)
489{
490 return (as_smd_item*)cf_vector_get_ptr(vec, i);
491}
492
493static inline void
494item_vec_set(cf_vector* vec, uint32_t i, const as_smd_item* item)
495{
496 cf_vector_set_ptr(vec, i, item);
497}
498
499static inline void
500item_vec_append(cf_vector* vec, const as_smd_item* item)
501{
502 cf_vector_append_ptr(vec, item);
503}
504
505static inline void
506item_vec_disown_items(cf_vector* vec)
507{
508 cf_vector_clear(vec);
509}
510
511static inline void
512item_vec_handoff(cf_vector* dst, cf_vector* src)
513{
514 *dst = *src;
515 memset(src, 0, sizeof(cf_vector)); // to zero .count and .vector
516}
517
518static inline void
519item_vec_replace(cf_vector* vec, uint32_t i, as_smd_item* item)
520{
521 as_smd_item* old_item = item_vec_get(vec, i);
522 char* tmp = item->key;
523
524 item->key = old_item->key; // keep this since it's pointed to by the hash
525 old_item->key = tmp; // to be destroyed below in smd_item_destroy()
526
527 item_vec_set(vec, i, item);
528 smd_item_destroy(old_item);
529}
530
531static inline void
532item_vec_destroy(cf_vector* vec)
533{
534 for (uint32_t i = 0; i < cf_vector_size(vec); i++) {
535 smd_item_destroy(item_vec_get(vec, i));
536 }
537
538 cf_vector_destroy(vec);
539}
540
541static inline smd_op*
542smd_op_create(void)
543{
544 return (smd_op*)cf_calloc(1, sizeof(smd_op));
545}
546
547static inline void
548smd_op_handoff(smd_op* dst, smd_op* src)
549{
550 *dst = *src; // includes .items vector
551 memset(&src->items, 0, sizeof(src->items)); // to zero .count and .vector
552}
553
554static inline void
555smd_op_destroy(smd_op* op)
556{
557 item_vec_destroy(&op->items);
558 cf_free(op->succession);
559 cf_free(op->version_list);
560 cf_free(op);
561}
562
563// Use MODULE_AS_STRING() - see below.
564static inline smd_module_string
565smd_module_as_string(const smd_module* module)
566{
567 smd_module_string str;
568
569 if (module == NULL) {
570 strcpy(str.s, "all");
571 }
572 else {
573 sprintf(str.s, "%s:%s:%lx-%lu", module->name, state_str[module->state],
574 module->cv_key, module->cv_tid);
575 }
576
577 return str;
578}
579
580#define MODULE_AS_STRING(_module) (smd_module_as_string(_module).s)
581
582#define OP_TYPE_AS_STRING(_type) \
583 (((0 <= (_type) && (_type) < NUM_SMD_OP_TYPES)) ? \
584 op_type_str[_type] : "INVALID_OP_TYPE")
585
586#define OP_TYPE_DETAIL(_type, _format, ...) \
587 cf_detail(AS_SMD, "{%s} %s - " _format, MODULE_AS_STRING(module), \
588 OP_TYPE_AS_STRING(_type), ##__VA_ARGS__)
589
590#define OP_DETAIL(_format, ...) OP_TYPE_DETAIL(op->type, _format, ##__VA_ARGS__)
591
592
593//==========================================================
594// Public API.
595//
596
597void
598as_smd_module_load(as_smd_id id, as_smd_accept_fn accept_cb,
599 as_smd_conflict_fn conflict_cb, const cf_vector* default_items)
600{
601 smd_module* module = smd_get_module(id);
602
603 module->accept_cb = accept_cb;
604 module->conflict_cb = conflict_cb == NULL ? smd_item_is_less : conflict_cb;
605
606 smd_hash_init(&module->db_h);
607 smd_hash_init(&module->merge_h);
608
609 module->id = id;
610 module->in_use = true;
611
612 module_restore_from_disk(module);
613 module_set_default_items(module, default_items);
614 module_accept_startup(module);
615}
616
617void
618as_smd_start(void)
619{
620 if (! cf_queue_init(&g_smd.pending_set_q, sizeof(smd_op*), CF_QUEUE_ALLOCSZ,
621 true)) {
622 cf_crash(AS_SMD, "failed to create set queue");
623 }
624
625 if (! cf_queue_init(&g_smd.event_q, sizeof(smd_op*), CF_QUEUE_ALLOCSZ,
626 true)) {
627 cf_crash(AS_SMD, "failed to create event queue");
628 }
629
630 g_smd.set_h = cf_shash_create(cf_shash_fn_u32, sizeof(uint32_t),
631 sizeof(smd_set_entry*), 64, 0);
632
633 as_fabric_register_msg_fn(M_TYPE_SMD, smd_mt, sizeof(smd_mt),
634 SMD_MSG_SCRATCH_SIZE, smd_msg_recv_cb, NULL);
635
636 as_exchange_register_listener(smd_cluster_changed_cb, NULL);
637
638 cf_thread_create_detached(run_smd, NULL);
639}
640
641void
642as_smd_set(as_smd_id id, const char* key, const char* value,
643 as_smd_set_fn set_cb, void* udata, uint64_t timeout)
644{
645 smd_op* op = smd_op_create();
646
647 op->type = SMD_OP_START_SET;
648 op->src = g_config.self_node;
649 op->module = smd_get_module(id);
650
651 op->set_cb = set_cb;
652 op->set_udata = udata;
653 op->set_timeout = (timeout == 0 ? DEFAULT_SET_TIMEOUT_MS : timeout);
654
655 item_vec_init(&op->items, 1);
656 item_vec_append(&op->items, smd_item_create_copy(key, value, 0, 0));
657
658 cf_queue_push(&g_smd.event_q, &op);
659}
660
661bool
662as_smd_set_blocking(as_smd_id id, const char* key, const char* value,
663 uint64_t timeout)
664{
665 cf_detail(AS_SMD, "{%d} blocking-set start - key %s", id, key);
666
667 cf_queue q;
668
669 cf_queue_init(&q, sizeof(bool), 1, true);
670 as_smd_set(id, key, value, smd_set_blocking_cb, &q, timeout);
671
672 bool result;
673
674 cf_queue_pop(&q, &result, CF_QUEUE_FOREVER);
675 cf_queue_destroy(&q);
676
677 cf_detail(AS_SMD, "{%d} blocking-set finished - key %s success %s", id, key,
678 (result ? "true" : "false"));
679
680 return result;
681}
682
683void
684as_smd_get_all(as_smd_id id, as_smd_get_all_fn cb, void* udata)
685{
686 smd_module* module = smd_get_module(id);
687
688 smd_lock();
689 cb(&module->db, udata);
690 smd_unlock();
691}
692
693void
694as_smd_get_info(cf_dyn_buf* db)
695{
696 smd_lock();
697
698 cf_dyn_buf_append_string(db, "smd:");
699 cf_dyn_buf_append_string(db, "n_pending_sets=");
700 cf_dyn_buf_append_uint32(db, (uint32_t)cf_queue_sz(&g_smd.pending_set_q));
701 cf_dyn_buf_append_char(db, ',');
702 cf_dyn_buf_append_string(db, "n_events=");
703 cf_dyn_buf_append_uint32(db, (uint32_t)cf_queue_sz(&g_smd.event_q));
704 cf_dyn_buf_append_char(db, ',');
705 cf_dyn_buf_append_string(db, "n_nodes=");
706 cf_dyn_buf_append_uint32(db, g_smd.node_count);
707 cf_dyn_buf_append_char(db, ',');
708 cf_dyn_buf_append_string(db, "principal=");
709 cf_dyn_buf_append_uint64_x(db, g_smd.succession[0]);
710 cf_dyn_buf_append_char(db, ',');
711 cf_dyn_buf_append_string(db, "cluster_key=");
712 cf_dyn_buf_append_uint64_x(db, g_smd.cl_key);
713 cf_dyn_buf_append_char(db, ';');
714
715 for (uint32_t i = 0; i < AS_SMD_NUM_MODULES; i++) {
716 smd_module* module = smd_get_module(i);
717
718 if (! module->in_use) {
719 continue;
720 }
721
722 cf_dyn_buf_append_string(db, module->name);
723 cf_dyn_buf_append_char(db, ':');
724 cf_dyn_buf_append_string(db, "committed_key=");
725 cf_dyn_buf_append_uint64_x(db, module->cv_key);
726 cf_dyn_buf_append_char(db, ',');
727 cf_dyn_buf_append_string(db, "committed_tid=");
728 cf_dyn_buf_append_uint64(db, module->cv_tid);
729 cf_dyn_buf_append_char(db, ',');
730 cf_dyn_buf_append_string(db, "n_keys=");
731 cf_dyn_buf_append_uint32(db, cf_vector_size(&module->db));
732 cf_dyn_buf_append_char(db, ',');
733 cf_dyn_buf_append_string(db, "state=");
734 cf_dyn_buf_append_string(db, state_str[module->state]);
735 cf_dyn_buf_append_char(db, ';');
736 }
737
738 smd_unlock();
739}
740
741
742//==========================================================
743// Local helpers - callbacks.
744//
745
746static int
747smd_msg_recv_cb(cf_node node_id, msg* m, void* udata)
748{
749 (void)udata;
750
751 smd_op* op = smd_op_create();
752
753 op->src = node_id;
754
755 if (! smd_msg_parse(m, op)) {
756 cf_warning(AS_SMD, "failed to parse msg op_type %d", op->type);
757 smd_op_destroy(op);
758 as_fabric_msg_put(m);
759 return -1;
760 }
761
762 as_fabric_msg_put(m);
763 cf_queue_push(&g_smd.event_q, &op);
764
765 return 0;
766}
767
768static void
769smd_cluster_changed_cb(const as_exchange_cluster_changed_event* ex_event,
770 void* udata)
771{
772 (void)udata;
773
774 size_t a_sz = ex_event->cluster_size * sizeof(cf_node);
775 cf_node* succession = cf_malloc(a_sz);
776
777 uint32_t* compatibility_ids = as_exchange_compatibility_ids();
778 uint32_t n_compatible = 0;
779
780 for (uint32_t n = 0; n < ex_event->cluster_size; n++) {
781 if (compatibility_ids[n] >= 1) {
782 succession[n_compatible++] = ex_event->succession[n];
783 }
784 }
785
786 smd_op* op = smd_op_create();
787
788 op->type = SMD_OP_CLUSTER_CHANGED;
789 op->cl_key = ex_event->cluster_key;
790
791 op->node_count = n_compatible;
792 op->succession = succession;
793
794 cf_queue_push(&g_smd.event_q, &op);
795}
796
797static void
798smd_set_blocking_cb(bool result, void* udata)
799{
800 cf_queue_push((cf_queue*)udata, &result);
801}
802
803
804//==========================================================
805// Local helpers - parse fabric msg.
806//
807
808static bool
809smd_msg_parse(msg* m, smd_op* op)
810{
811 uint32_t version;
812
813 if (msg_get_uint32(m, SMD_MSG_VERSION, &version) == 0) {
814 cf_ticker_warning(AS_SMD, "incompatible msg version %u", version);
815 return false;
816 }
817
818 uint32_t type;
819
820 if (msg_get_uint32(m, SMD_MSG_OP, &type) != 0) {
821 cf_warning(AS_SMD, "msg missing op type");
822 return false;
823 }
824
825 op->type = (smd_op_type)type;
826
827 if (msg_get_uint64(m, SMD_MSG_CLUSTER_KEY, &op->cl_key) != 0) {
828 cf_warning(AS_SMD, "msg missing cluster key");
829 return false;
830 }
831
832 if (op->type == SMD_OP_REPORT_ALL_VERS_TO_PR) {
833 uint32_t count = (AS_SMD_NUM_MODULES + NUM_FUTURE_MODULES) * 3;
834 uint64_t versions[count];
835
836 if (! msg_msgpack_list_get_uint64_array(m, SMD_MSG_VERSION_LIST,
837 versions, &count) || count == 0 || count % 3 != 0) {
838 cf_warning(AS_SMD, "msg missing or invalid version list");
839 return false;
840 }
841
842 op->version_count = count / 3;
843 op->version_list = cf_malloc(count * sizeof(uint64_t));
844 memcpy(op->version_list, versions, count * sizeof(uint64_t));
845
846 return true;
847 }
848
849 uint32_t mod_id;
850
851 if (msg_get_uint32(m, SMD_MSG_MODULE_ID, &mod_id) != 0 ||
852 mod_id >= AS_SMD_NUM_MODULES) {
853 cf_detail(AS_SMD, "msg missing or unknown module id");
854 return false;
855 }
856
857 op->module = smd_get_module((as_smd_id)mod_id);
858
859 if (! op->module->in_use) {
860 cf_detail(AS_SMD, "module %s not in use", op->module->name);
861 return false;
862 }
863
864 switch (op->type) {
865 // To principal.
866 case SMD_OP_SET_TO_PR:
867 if (msg_get_uint64(m, SMD_MSG_TID, &op->tid) != 0) {
868 cf_warning(AS_SMD, "msg missing tid");
869 return false;
870 }
871 return smd_msg_parse_items(m, op);
872 case SMD_OP_REPORT_VER_TO_PR:
873 if (msg_get_uint64(m, SMD_MSG_COMMITTED_CL_KEY,
874 &op->committed_key) != 0) {
875 cf_warning(AS_SMD, "msg missing committed cluster key");
876 return false;
877 }
878 if (msg_get_uint64(m, SMD_MSG_TID, &op->tid) != 0) {
879 cf_warning(AS_SMD, "msg missing tid");
880 return false;
881 }
882 return true;
883 case SMD_OP_FULL_TO_PR:
884 if (msg_get_uint64(m, SMD_MSG_COMMITTED_CL_KEY,
885 &op->committed_key) != 0) {
886 cf_warning(AS_SMD, "msg missing committed cluster key");
887 return false;
888 }
889 if (msg_get_uint64(m, SMD_MSG_TID, &op->tid) != 0) {
890 cf_warning(AS_SMD, "msg missing tid");
891 return false;
892 }
893 return smd_msg_parse_items(m, op);
894 case SMD_OP_ACK_TO_PR:
895 if (msg_get_uint64(m, SMD_MSG_TID, &op->tid) != 0) {
896 cf_warning(AS_SMD, "msg missing tid");
897 return false;
898 }
899 return true;
900
901 // From principal.
902 case SMD_OP_SET_FROM_PR:
903 if (msg_get_uint64(m, SMD_MSG_TID, &op->tid) != 0) {
904 cf_warning(AS_SMD, "msg missing tid");
905 return false;
906 }
907 return smd_msg_parse_items(m, op);
908 case SMD_OP_REQ_VER_FROM_PR:
909 return true;
910 case SMD_OP_FULL_FROM_PR:
911 if (msg_get_uint64(m, SMD_MSG_COMMITTED_CL_KEY,
912 &op->committed_key) != 0) {
913 cf_warning(AS_SMD, "msg missing committed cluster key");
914 return false;
915 }
916 if (msg_get_uint64(m, SMD_MSG_TID, &op->tid) != 0) {
917 cf_warning(AS_SMD, "msg missing tid");
918 return false;
919 }
920 return smd_msg_parse_items(m, op);
921 case SMD_OP_REQ_FULL_FROM_PR:
922 return true;
923
924 case SMD_OP_SET_ACK:
925 case SMD_OP_SET_NACK:
926 if (msg_get_uint64(m, SMD_MSG_TID, &op->tid) != 0) {
927 cf_warning(AS_SMD, "msg missing tid");
928 return false;
929 }
930 return true;
931
932 default:
933 cf_warning(AS_SMD, "invalid type %d", op->type);
934 break;
935 }
936
937 return false;
938}
939
940static bool
941smd_msg_parse_items(msg* m, smd_op* op)
942{
943 char* key;
944
945 if (msg_get_str(m, SMD_MSG_SINGLE_KEY, &key, MSG_GET_DIRECT) == 0) {
946 char* value = NULL;
947 uint32_t gen = 0;
948 uint64_t ts = 0;
949
950 msg_get_str(m, SMD_MSG_SINGLE_VALUE, &value, MSG_GET_DIRECT);
951 msg_get_uint32(m, SMD_MSG_SINGLE_GENERATION, &gen);
952 msg_get_uint64(m, SMD_MSG_SINGLE_TIMESTAMP, &ts);
953
954 item_vec_init(&op->items, 1);
955 item_vec_append(&op->items, smd_item_create_copy(key, value, ts, gen));
956
957 return true;
958 }
959 // else - multiple items.
960
961 uint32_t count;
962
963 if (! msg_msgpack_list_get_count(m, SMD_MSG_KEY_LIST, &count)) {
964 cf_warning(AS_SMD, "msg missing key list");
965 return false;
966 }
967
968 if (count == 0) {
969 item_vec_init(&op->items, 0);
970 return true; // empty list - this can happen
971 }
972
973 uint32_t check;
974
975 if (! msg_msgpack_list_get_count(m, SMD_MSG_VALUE_LIST, &check) &&
976 check != count) {
977 cf_warning(AS_SMD, "msg items count mismatch");
978 return false;
979 }
980
981 if (! msg_get_uint64_array_count(m, SMD_MSG_TS_ARRAY, &check) &&
982 check != count) {
983 cf_warning(AS_SMD, "msg items count mismatch or missing ts array");
984 return false;
985 }
986
987 cf_vector_define(key_vec, sizeof(msg_buf_ele), count, 0);
988 cf_vector_define(value_vec, sizeof(msg_buf_ele), count, 0);
989 uint32_t gen_list[count];
990
991 if (! msg_msgpack_list_get_buf_array_presized(m, SMD_MSG_KEY_LIST,
992 &key_vec)) {
993 cf_warning(AS_SMD, "msg missing key list");
994 return false;
995 }
996
997 if (! msg_msgpack_list_get_buf_array_presized(m, SMD_MSG_VALUE_LIST,
998 &value_vec)) {
999 cf_warning(AS_SMD, "msg missing value list");
1000 return false;
1001 }
1002
1003 if (! msg_msgpack_list_get_uint32_array(m, SMD_MSG_GEN_LIST, gen_list,
1004 &check) && check != count) {
1005 cf_warning(AS_SMD, "msg missing gen list");
1006 return false;
1007 }
1008
1009 item_vec_init(&op->items, count);
1010
1011 for (uint32_t i = 0; i < count; i++) {
1012 msg_buf_ele* key_p = (msg_buf_ele*)cf_vector_getp(&key_vec, i);
1013 msg_buf_ele* val_p = (msg_buf_ele*)cf_vector_getp(&value_vec, i);
1014 uint64_t ts = 0;
1015
1016 msg_get_uint64_array(m, SMD_MSG_TS_ARRAY, i, &ts);
1017
1018 item_vec_append(&op->items, smd_item_create_handoff(
1019 cf_strndup((char*)key_p->ptr, key_p->sz),
1020 smd_item_value_ndup(val_p->ptr, val_p->sz), ts, gen_list[i]));
1021 }
1022
1023 return true;
1024}
1025
1026
1027//==========================================================
1028// Local helpers - event loop.
1029//
1030
1031static void*
1032run_smd(void* udata)
1033{
1034 (void)udata;
1035
1036 while (true) {
1037 smd_op* op = NULL;
1038 int wait_ms = set_orig_try_retransmit_or_expire();
1039
1040 if (smd_is_pr()) {
1041 int pr_wait_ms = pr_try_retransmit();
1042
1043 if (pr_wait_ms == INT_MAX) {
1044 cf_queue_pop(&g_smd.pending_set_q, &op, CF_QUEUE_NOWAIT);
1045 }
1046 else if (pr_wait_ms < wait_ms) {
1047 wait_ms = pr_wait_ms;
1048 }
1049 }
1050
1051 if (op == NULL) {
1052 cf_queue_pop(&g_smd.event_q, &op,
1053 wait_ms == INT_MAX ? CF_QUEUE_FOREVER : wait_ms);
1054 }
1055
1056 if (op != NULL) {
1057 smd_lock();
1058 smd_event(op);
1059 smd_unlock();
1060
1061 smd_op_destroy(op);
1062 }
1063 }
1064
1065 return NULL;
1066}
1067
1068static int
1069pr_try_retransmit(void)
1070{
1071 uint64_t next_ms = UINT64_MAX;
1072 uint64_t now_ms = cf_getms();
1073
1074 for (uint32_t i = 0; i < AS_SMD_NUM_MODULES; i++) {
1075 smd_module* module = smd_get_module((as_smd_id)i);
1076
1077 if (! module->in_use) {
1078 continue;
1079 }
1080
1081 if (module->retry_next_ms != 0 && module->retry_next_ms < now_ms) {
1082 pr_send_msgs(module);
1083 }
1084
1085 if (module->retry_next_ms != 0 && module->retry_next_ms < next_ms) {
1086 next_ms = module->retry_next_ms;
1087 }
1088 }
1089
1090 return next_ms == UINT64_MAX ? INT_MAX : (int)(next_ms - now_ms);
1091}
1092
1093static int
1094set_orig_try_retransmit_or_expire(void)
1095{
1096 set_orig_reduce_udata udata = {
1097 .wait_ms = INT_MAX,
1098 .now_ms = cf_getms()
1099 };
1100
1101 cf_shash_reduce(g_smd.set_h, set_orig_reduce_cb, &udata);
1102
1103 return udata.wait_ms;
1104}
1105
1106static int
1107set_orig_reduce_cb(const void* key, void* value, void* udata)
1108{
1109 smd_set_entry* entry = *(smd_set_entry**)value;
1110 set_orig_reduce_udata* p = (set_orig_reduce_udata*)udata;
1111
1112 if (entry->deadline_ms < p->now_ms) {
1113 if (entry->cb != NULL) {
1114 entry->cb(false, entry->udata);
1115 }
1116
1117 smd_set_entry_destroy(entry);
1118
1119 return CF_SHASH_REDUCE_DELETE;
1120 }
1121
1122 if (entry->retry_next_ms != 0 && entry->retry_next_ms < p->now_ms) {
1123 send_set_from_orig(*(uint32_t*)key, entry);
1124 }
1125
1126 uint64_t next_ms = entry->deadline_ms;
1127
1128 if (entry->retry_next_ms != 0 && entry->retry_next_ms < next_ms) {
1129 next_ms = entry->retry_next_ms;
1130 }
1131
1132 int wait_ms = (int)(next_ms - p->now_ms);
1133
1134 if (wait_ms < p->wait_ms) {
1135 p->wait_ms = wait_ms;
1136 }
1137
1138 return CF_SHASH_OK;
1139}
1140
1141static void
1142smd_event(smd_op* op)
1143{
1144 smd_module* module = op->module;
1145
1146 if (op->type == SMD_OP_CLUSTER_CHANGED) {
1147 OP_DETAIL("principal %lx -> %lx", g_smd.succession[0],
1148 op->succession[0]);
1149 op_cluster_changed(op);
1150 return;
1151 }
1152
1153 OP_DETAIL("source %lx", op->src);
1154
1155 if (op->type == SMD_OP_START_SET) {
1156 op_start_set(op);
1157 return;
1158 }
1159
1160 int node_index = index_of_node(g_smd.succession, g_smd.node_count, op->src);
1161
1162 switch (op->type) {
1163 case SMD_OP_SET_TO_PR:
1164 case SMD_OP_SET_ACK:
1165 case SMD_OP_SET_NACK:
1166 break; // these don't care about src or cluster key
1167 default:
1168 if (node_index < 0 || g_smd.cl_key != op->cl_key) {
1169 return;
1170 }
1171 }
1172
1173 op->node_index = (uint32_t)node_index;
1174
1175 switch (op->type) {
1176 // To principal.
1177 case SMD_OP_SET_TO_PR:
1178 op_set_to_pr(op);
1179 break;
1180 case SMD_OP_REPORT_ALL_VERS_TO_PR:
1181 op_report_all_vers_to_pr(op);
1182 break;
1183 case SMD_OP_REPORT_VER_TO_PR:
1184 op_report_ver_to_pr(op);
1185 break;
1186 case SMD_OP_FULL_TO_PR:
1187 op_full_to_pr(op);
1188 break;
1189 case SMD_OP_ACK_TO_PR:
1190 op_ack_to_pr(op);
1191 break;
1192
1193 // From principal.
1194 case SMD_OP_SET_FROM_PR:
1195 op_set_from_pr(op);
1196 break;
1197 case SMD_OP_REQ_VER_FROM_PR:
1198 op_req_ver_from_pr(op);
1199 break;
1200 case SMD_OP_FULL_FROM_PR:
1201 op_full_from_pr(op);
1202 break;
1203 case SMD_OP_REQ_FULL_FROM_PR:
1204 op_req_full_from_pr(op);
1205 break;
1206
1207 case SMD_OP_SET_ACK:
1208 op_finish_set(op, true);
1209 break;
1210 case SMD_OP_SET_NACK:
1211 op_finish_set(op, false);
1212 break;
1213
1214 default:
1215 cf_ticker_warning(AS_SMD, "invalid op %d", op->type);
1216 break;
1217 }
1218}
1219
1220
1221//==========================================================
1222// Local helpers - events.
1223//
1224
1225static void
1226op_cluster_changed(smd_op* op)
1227{
1228 cf_assert(op->node_count <= AS_CLUSTER_SZ, AS_SMD, "cluster count invalid %d > %d",
1229 g_smd.node_count, AS_CLUSTER_SZ);
1230
1231 bool was_pr = smd_is_pr();
1232
1233 g_smd.cl_key = op->cl_key;
1234 g_smd.node_count = op->node_count;
1235 memcpy(g_smd.succession, op->succession, sizeof(cf_node) * op->node_count);
1236
1237 for (uint32_t i = 0; i < AS_SMD_NUM_MODULES; i++) {
1238 smd_module* module = smd_get_module((as_smd_id)i);
1239
1240 if (! module->in_use) {
1241 continue;
1242 }
1243
1244 if (was_pr && module->state == STATE_SET) {
1245 send_set_reply(module, false);
1246 }
1247
1248 pr_clear_retry_msgs(module);
1249
1250 if (g_smd.node_count == 1) {
1251 OP_DETAIL("single node move to state %s", state_str[STATE_PR]);
1252
1253 module->state = STATE_PR;
1254 }
1255 else if (smd_is_pr()) {
1256 msg* m = as_fabric_msg_get(M_TYPE_SMD);
1257
1258 msg_set_uint32(m, SMD_MSG_OP, SMD_OP_REQ_VER_FROM_PR);
1259 msg_set_uint32(m, SMD_MSG_MODULE_ID, module->id);
1260 msg_set_uint64(m, SMD_MSG_CLUSTER_KEY, g_smd.cl_key);
1261
1262 pr_set_retry_msg(module, m);
1263 module->retry_next_ms = cf_getms() + SMD_RETRY_MS;
1264
1265 item_vec_destroy(&module->merge);
1266 item_vec_init(&module->merge, 16);
1267 smd_hash_clear(&module->merge_h);
1268
1269 OP_DETAIL("move to state %s", state_str[STATE_MERGING]);
1270
1271 module->state = STATE_MERGING;
1272 }
1273 else {
1274 OP_DETAIL("move to state %s", state_str[STATE_NPR]);
1275
1276 module->state = STATE_NPR;
1277 }
1278 }
1279
1280 if (! smd_is_pr()) {
1281 usleep(REPORT_VER_DELAY_US); // allow principal time to advance
1282
1283 send_report_all_ver_to_pr();
1284
1285 smd_op* pending_op;
1286
1287 while (cf_queue_pop(&g_smd.pending_set_q, &pending_op,
1288 CF_QUEUE_NOWAIT) == CF_QUEUE_OK) {
1289 smd_op_destroy(pending_op);
1290 }
1291 }
1292}
1293
1294static void
1295op_start_set(smd_op* op)
1296{
1297 if (++g_smd.set_tid == 0) {
1298 g_smd.set_tid = 1;
1299 }
1300
1301 as_smd_item* item = item_vec_get(&op->items, 0);
1302 uint64_t now_ms = cf_getms();
1303
1304 item_vec_set(&op->items, 0, NULL); // malloc handoff
1305
1306 smd_set_entry* entry = (smd_set_entry*)cf_malloc(sizeof(smd_set_entry));
1307
1308 entry->cl_key = g_smd.cl_key;
1309 entry->cb = op->set_cb;
1310 entry->udata = op->set_udata;
1311 entry->deadline_ms = now_ms + op->set_timeout;
1312 entry->retry_next_ms = 0;
1313 entry->item = item;
1314 entry->module = op->module;
1315
1316 cf_shash_put(g_smd.set_h, &g_smd.set_tid, &entry);
1317
1318 if (g_smd.node_count == 0) {
1319 entry->retry_next_ms = now_ms + SET_RETRY_MS;
1320 return;
1321 }
1322
1323 send_set_from_orig(g_smd.set_tid, entry);
1324}
1325
1326static void
1327op_set_to_pr(smd_op* op)
1328{
1329 smd_module* module = op->module;
1330
1331 if (cf_vector_size(&op->items) != 1) {
1332 cf_warning(AS_SMD, "bad msg item count %u", cf_vector_size(&op->items));
1333 return;
1334 }
1335
1336 if (smd_is_pr() && module->state != STATE_PR) {
1337 if (op->tid == module->set_tid && op->src == module->set_src &&
1338 op->cl_key == module->set_key) {
1339 OP_DETAIL("already setting - ignoring");
1340 return;
1341 }
1342
1343 if (pending_set_q_contains(op)) {
1344 OP_DETAIL("already pending - ignoring");
1345 return;
1346 }
1347
1348 OP_DETAIL("move to pending");
1349
1350 smd_op* new_op = smd_op_create();
1351
1352 smd_op_handoff(new_op, op);
1353 cf_queue_push(&g_smd.pending_set_q, &new_op);
1354 return;
1355 }
1356
1357 module->set_src = op->src;
1358 module->set_key = op->cl_key;
1359 module->set_tid = op->tid;
1360
1361 if (! smd_is_pr()) {
1362 OP_DETAIL("not principal - ignoring");
1363 send_set_reply(module, false);
1364 return;
1365 }
1366
1367 as_smd_item* item = item_vec_get(&op->items, 0);
1368
1369 if (module_set_pr(module, item->key, item->value)) { // malloc handoff
1370 smd_state next_state = g_smd.node_count == 1 ? STATE_PR : STATE_SET;
1371
1372 OP_DETAIL("new-key %s move to state %s", item->key,
1373 state_str[next_state]);
1374
1375 module->state = next_state;
1376
1377 if (g_smd.node_count == 1) {
1378 send_set_reply(module, true);
1379 }
1380 }
1381 else {
1382 send_set_reply(module, true);
1383 }
1384
1385 item->key = NULL;
1386 item->value = NULL;
1387}
1388
1389static void
1390op_report_all_vers_to_pr(smd_op* op)
1391{
1392 uint32_t count = op->version_count * 3;
1393 uint32_t ix = 0;
1394
1395 while (ix < count) {
1396 uint64_t module_id = op->version_list[ix++];
1397 uint64_t cv_key = op->version_list[ix++];
1398 uint64_t cv_tid = op->version_list[ix++];
1399
1400 if (module_id >= AS_SMD_NUM_MODULES) {
1401 cf_detail(AS_SMD, "unknown module %ld", module_id);
1402 continue;
1403 }
1404
1405 smd_module* module = smd_get_module((as_smd_id)module_id);
1406
1407 if (! module->in_use) {
1408 cf_detail(AS_SMD, "module %s not in use", module->name);
1409 continue;
1410 }
1411
1412 smd_op module_op = {
1413 .type = op->type,
1414 .node_index = op->node_index,
1415 .module = module,
1416 .committed_key = cv_key,
1417 .tid = cv_tid
1418 };
1419
1420 op_report_ver_to_pr(&module_op);
1421 }
1422}
1423
1424static void
1425op_report_ver_to_pr(smd_op* op)
1426{
1427 smd_module* module = op->module;
1428
1429 if (! pr_mark_reply(op, STATE_MERGING)) {
1430 return;
1431 }
1432
1433 bool pr_is_dirty = false;
1434
1435 if ((op->committed_key == module->cv_key && module->cv_key != 0) ||
1436 (op->committed_key == 0 && op->tid == 0)) {
1437 // Note - committed_key is zero on older nodes.
1438 module->merge_tids[op->node_index] = op->tid;
1439 }
1440 else {
1441 pr_clear_retry_msgs(module);
1442 pr_is_dirty = true;
1443 }
1444
1445 if (module->retry_msg_count != 0) {
1446 OP_DETAIL("pending replies %u", module->retry_msg_count);
1447 return;
1448 }
1449 // else - got all versions or is dirty.
1450
1451 bool npr_is_dirty[AS_CLUSTER_SZ] = { false };
1452 bool npr_has_dirty = false;
1453
1454 if (! pr_is_dirty) {
1455 for (uint32_t i = 1; i < g_smd.node_count; i++) {
1456 uint64_t tid = module->merge_tids[i];
1457
1458 if (tid < module->cv_tid) {
1459 npr_is_dirty[i] = true;
1460 npr_has_dirty = true;
1461 }
1462 else if (tid > module->cv_tid) {
1463 pr_is_dirty = true;
1464 break;
1465 }
1466 // else - both still clean.
1467 }
1468 }
1469
1470 if (pr_is_dirty) {
1471 OP_DETAIL("move to state %s", state_str[STATE_DIRTY]);
1472
1473 module->state = STATE_DIRTY;
1474
1475 msg* m = as_fabric_msg_get(M_TYPE_SMD);
1476
1477 msg_set_uint32(m, SMD_MSG_OP, SMD_OP_REQ_FULL_FROM_PR);
1478 msg_set_uint32(m, SMD_MSG_MODULE_ID, module->id);
1479
1480 msg_set_uint64(m, SMD_MSG_CLUSTER_KEY, g_smd.cl_key);
1481 msg_set_uint64(m, SMD_MSG_COMMITTED_CL_KEY, module->cv_key);
1482 msg_set_uint64(m, SMD_MSG_TID, module->cv_tid);
1483
1484 pr_set_retry_msg(module, m);
1485 pr_send_msgs(module);
1486 return;
1487 }
1488
1489 if (! npr_has_dirty) {
1490 OP_DETAIL("move to state %s", state_str[STATE_PR]);
1491
1492 module->state = STATE_PR;
1493 return;
1494 }
1495
1496 OP_DETAIL("move to state %s", state_str[STATE_CLEAN]);
1497
1498 module->state = STATE_CLEAN;
1499
1500 msg* full = as_fabric_msg_get(M_TYPE_SMD);
1501
1502 msg_set_uint32(full, SMD_MSG_OP, SMD_OP_FULL_FROM_PR);
1503 module_fill_msg(module, full);
1504
1505 module->retry_msg_count = 0;
1506 module->retry_msgs[0] = NULL;
1507
1508 for (uint32_t i = 1; i < g_smd.node_count; i++) {
1509 if (! npr_is_dirty[i]) {
1510 module->retry_msgs[i] = NULL;
1511 }
1512 else {
1513 msg_incr_ref(full);
1514 module->retry_msgs[i] = full;
1515 module->retry_msg_count++;
1516 }
1517 }
1518
1519 as_fabric_msg_put(full);
1520
1521 pr_send_msgs(module);
1522}
1523
1524static void
1525op_full_to_pr(smd_op* op)
1526{
1527 if (! pr_mark_reply(op, STATE_DIRTY)) {
1528 return;
1529 }
1530
1531 smd_module* module = op->module;
1532
1533 module_merge_list(module, &op->items);
1534
1535 if (module->retry_msg_count != 0) {
1536 OP_DETAIL("pending replies %u", module->retry_msg_count);
1537 return;
1538 }
1539
1540 for (uint32_t i = 0; i < cf_vector_size(&module->merge); i++) {
1541 as_smd_item* new_item = item_vec_get(&module->merge, i);
1542 uint32_t ix;
1543
1544 if (! smd_hash_get(&module->db_h, new_item->key, &ix)) { // new key
1545 module_append_item(module, new_item);
1546 continue;
1547 }
1548 // else - existing key.
1549
1550 item_vec_replace(&module->db, ix, new_item);
1551 }
1552
1553 if (cf_vector_size(&module->merge) != 0) {
1554 module_accept_list(module, &module->merge);
1555 item_vec_disown_items(&module->merge);
1556 }
1557
1558 module->cv_tid = 1;
1559 module->cv_key = g_smd.cl_key;
1560
1561 module_commit_to_disk(module);
1562 send_full_from_pr(module);
1563
1564 OP_DETAIL("n-items %u - move to state %s", cf_vector_size(&module->merge),
1565 state_str[STATE_CLEAN]);
1566
1567 module->state = STATE_CLEAN;
1568}
1569
1570static void
1571op_ack_to_pr(smd_op* op)
1572{
1573 smd_module* module = op->module;
1574
1575 if (module->state != STATE_CLEAN && module->state != STATE_SET) {
1576 return;
1577 }
1578
1579 if (op->tid != module->cv_tid) {
1580 OP_DETAIL("tid mismatch %lu != %lu", op->tid, module->cv_tid);
1581 return;
1582 }
1583
1584 if (! pr_mark_reply(op, module->state)) {
1585 return;
1586 }
1587
1588 if (module->retry_msg_count != 0) {
1589 OP_DETAIL("pending replies %u", module->retry_msg_count);
1590 return;
1591 }
1592 // else - got all acks.
1593
1594 if (module->state == STATE_SET) {
1595 send_set_reply(module, true);
1596 }
1597
1598 OP_DETAIL("move to state %s", state_str[STATE_PR]);
1599
1600 module->state = STATE_PR;
1601}
1602
1603static void
1604op_set_from_pr(smd_op* op)
1605{
1606 smd_module* module = op->module;
1607
1608 if (module->state != STATE_NPR) {
1609 return;
1610 }
1611
1612 if (op->node_index != 0) {
1613 cf_warning(AS_SMD, "set not from principal - src %lx", op->src);
1614 return;
1615 }
1616
1617 if (cf_vector_size(&op->items) != 1) {
1618 cf_warning(AS_SMD, "set items count %d != 1",
1619 cf_vector_size(&op->items));
1620 return;
1621 }
1622
1623 module->cv_key = g_smd.cl_key;
1624 module->cv_tid = op->tid;
1625
1626 module_set_npr(module, item_vec_get(&op->items, 0));
1627 item_vec_disown_items(&op->items);
1628
1629 send_ack_to_pr(op); // last, so item is accepted before originator acks app
1630}
1631
1632static void
1633op_req_ver_from_pr(smd_op* op)
1634{
1635 smd_module* module = op->module;
1636
1637 if (module->state != STATE_NPR) {
1638 return;
1639 }
1640
1641 send_report_ver_to_pr(module);
1642}
1643
1644static void
1645op_full_from_pr(smd_op* op)
1646{
1647 smd_module* module = op->module;
1648
1649 if (module->state != STATE_NPR) {
1650 return;
1651 }
1652
1653 if (op->node_index != 0) {
1654 cf_warning(AS_SMD, "set full not from principal - src %lx", op->src);
1655 return;
1656 }
1657
1658 send_ack_to_pr(op);
1659
1660 if (op->committed_key == module->cv_key && op->tid == module->cv_tid) {
1661 return; // normal on retransmits
1662 }
1663
1664 module->cv_key = op->committed_key;
1665 module->cv_tid = op->tid;
1666
1667 OP_DETAIL("replacing all");
1668
1669 cf_vector merge_list;
1670 item_vec_init(&merge_list, cf_vector_size(&op->items));
1671
1672 for (uint32_t i = 0; i < cf_vector_size(&op->items); i++) {
1673 as_smd_item* new_item = item_vec_get(&op->items, i);
1674 uint32_t ix;
1675
1676 if (! smd_hash_get(&module->db_h, new_item->key, &ix)) {
1677 item_vec_append(&merge_list, new_item);
1678 continue;
1679 }
1680
1681 const as_smd_item* item = item_vec_get_const(&module->db, ix);
1682
1683 if (smd_item_is_less(item, new_item)) {
1684 item_vec_append(&merge_list, new_item);
1685 }
1686 }
1687
1688 module_accept_list(module, &merge_list);
1689 item_vec_disown_items(&merge_list);
1690 item_vec_destroy(&merge_list);
1691
1692 item_vec_destroy(&module->db);
1693 item_vec_handoff(&module->db, &op->items);
1694
1695 module_regen_key2index(module);
1696
1697 module_commit_to_disk(module);
1698}
1699
1700static void
1701op_req_full_from_pr(smd_op* op)
1702{
1703 smd_module* module = op->module;
1704
1705 if (module->state != STATE_NPR) {
1706 return;
1707 }
1708
1709 OP_DETAIL("sending all");
1710
1711 msg* m = as_fabric_msg_get(M_TYPE_SMD);
1712
1713 msg_set_uint32(m, SMD_MSG_OP, SMD_OP_FULL_TO_PR);
1714 module_fill_msg(module, m);
1715
1716 if (as_fabric_send(g_smd.succession[0], m, AS_FABRIC_CHANNEL_META) !=
1717 AS_FABRIC_SUCCESS) {
1718 as_fabric_msg_put(m);
1719 }
1720}
1721
1722static void
1723op_finish_set(smd_op* op, bool success)
1724{
1725 smd_set_entry* entry;
1726 uint32_t tid = (uint32_t)op->tid;
1727
1728 if (cf_shash_get(g_smd.set_h, &tid, &entry) != CF_SHASH_OK) {
1729 cf_detail(AS_SMD, "set-tid %u not in hash", tid);
1730 return;
1731 }
1732
1733 if (op->cl_key != entry->cl_key) {
1734 cf_warning(AS_SMD, "mismatched cluster key %lx in set", op->cl_key);
1735 return;
1736 }
1737
1738 if (! success) {
1739 entry->retry_next_ms = cf_getms() + SET_RETRY_MS;
1740 return;
1741 }
1742
1743 int ret = cf_shash_delete(g_smd.set_h, &tid);
1744
1745 cf_assert(ret == CF_SHASH_OK, AS_SMD, "shash_delete");
1746
1747 if (entry->cb != NULL) {
1748 entry->cb(true, entry->udata);
1749 }
1750
1751 smd_set_entry_destroy(entry);
1752}
1753
1754
1755//==========================================================
1756// Local helpers - pending set queue.
1757//
1758
1759static bool
1760pending_set_q_contains(const smd_op* op)
1761{
1762 cf_queue_reduce(&g_smd.pending_set_q, pending_set_q_reduce_cb, &op);
1763
1764 return op == NULL; // op is set NULL if it is found
1765}
1766
1767static int
1768pending_set_q_reduce_cb(void* ptr, void* udata)
1769{
1770 const smd_op* op_in_q = *(const smd_op**)ptr;
1771 const smd_op** p_op = (const smd_op**)udata;
1772 const smd_op* op = *p_op;
1773
1774 if (op->tid == op_in_q->tid && op->src == op_in_q->src &&
1775 op->cl_key == op_in_q->cl_key) {
1776 *p_op = NULL;
1777 return -1; // found match - stop reduce
1778 }
1779
1780 return 0;
1781}
1782
1783
1784//==========================================================
1785// Local helpers - fabric msg send/reply.
1786//
1787
1788static void
1789send_set_from_pr(smd_module* module, const as_smd_item* item)
1790{
1791 module->cv_key = g_smd.cl_key;
1792 module->cv_tid++;
1793
1794 msg* m = as_fabric_msg_get(M_TYPE_SMD);
1795
1796 msg_set_uint32(m, SMD_MSG_OP, SMD_OP_SET_FROM_PR);
1797 msg_set_uint64(m, SMD_MSG_CLUSTER_KEY, g_smd.cl_key);
1798 msg_set_uint64(m, SMD_MSG_TID, module->cv_tid);
1799
1800 msg_set_uint32(m, SMD_MSG_MODULE_ID, module->id);
1801 msg_set_str(m, SMD_MSG_SINGLE_KEY, item->key, MSG_SET_COPY);
1802
1803 if (item->value != NULL) {
1804 msg_set_str(m, SMD_MSG_SINGLE_VALUE, item->value, MSG_SET_COPY);
1805 }
1806
1807 msg_set_uint32(m, SMD_MSG_SINGLE_GENERATION, item->generation);
1808 msg_set_uint64(m, SMD_MSG_SINGLE_TIMESTAMP, item->timestamp);
1809
1810 pr_set_retry_msg(module, m);
1811 pr_send_msgs(module);
1812}
1813
1814static void
1815send_full_from_pr(smd_module* module)
1816{
1817 msg* m = as_fabric_msg_get(M_TYPE_SMD);
1818
1819 msg_set_uint32(m, SMD_MSG_OP, SMD_OP_FULL_FROM_PR);
1820 module_fill_msg(module, m);
1821
1822 pr_set_retry_msg(module, m);
1823 pr_send_msgs(module);
1824}
1825
1826static void
1827send_report_all_ver_to_pr(void)
1828{
1829 msg* m = as_fabric_msg_get(M_TYPE_SMD);
1830
1831 msg_set_uint32(m, SMD_MSG_OP, SMD_OP_REPORT_ALL_VERS_TO_PR);
1832
1833 msg_set_uint64(m, SMD_MSG_CLUSTER_KEY, g_smd.cl_key);
1834
1835 uint32_t count = 0;
1836 uint64_t versions[AS_SMD_NUM_MODULES * 3];
1837
1838 for (uint32_t i = 0; i < AS_SMD_NUM_MODULES; i++) {
1839 smd_module* module = smd_get_module((as_smd_id)i);
1840
1841 if (module->in_use) {
1842 versions[count++] = (uint64_t)module->id;
1843 versions[count++] = module->cv_key;
1844 versions[count++] = module->cv_tid;
1845 }
1846 }
1847
1848 msg_msgpack_list_set_uint64(m, SMD_MSG_VERSION_LIST, versions, count);
1849
1850 if (as_fabric_send(g_smd.succession[0], m, AS_FABRIC_CHANNEL_META) !=
1851 AS_FABRIC_SUCCESS) {
1852 as_fabric_msg_put(m);
1853 }
1854}
1855
1856static void
1857send_report_ver_to_pr(smd_module* module)
1858{
1859 msg* m = as_fabric_msg_get(M_TYPE_SMD);
1860
1861 msg_set_uint32(m, SMD_MSG_OP, SMD_OP_REPORT_VER_TO_PR);
1862 msg_set_uint32(m, SMD_MSG_MODULE_ID, module->id);
1863
1864 msg_set_uint64(m, SMD_MSG_CLUSTER_KEY, g_smd.cl_key);
1865
1866 msg_set_uint64(m, SMD_MSG_COMMITTED_CL_KEY, module->cv_key);
1867 msg_set_uint64(m, SMD_MSG_TID, module->cv_tid);
1868
1869 if (as_fabric_send(g_smd.succession[0], m, AS_FABRIC_CHANNEL_META) !=
1870 AS_FABRIC_SUCCESS) {
1871 as_fabric_msg_put(m);
1872 }
1873}
1874
1875static void
1876send_ack_to_pr(smd_op* op)
1877{
1878 msg* m = as_fabric_msg_get(M_TYPE_SMD);
1879
1880 msg_set_uint32(m, SMD_MSG_OP, SMD_OP_ACK_TO_PR);
1881 msg_set_uint32(m, SMD_MSG_MODULE_ID, op->module->id);
1882
1883 msg_set_uint64(m, SMD_MSG_CLUSTER_KEY, op->cl_key);
1884 msg_set_uint64(m, SMD_MSG_TID, op->tid);
1885
1886 if (as_fabric_send(g_smd.succession[0], m, AS_FABRIC_CHANNEL_META) !=
1887 AS_FABRIC_SUCCESS) {
1888 as_fabric_msg_put(m);
1889 }
1890}
1891
1892static void
1893send_set_reply(smd_module* module, bool success)
1894{
1895 if (module->set_src == g_config.self_node) {
1896 smd_op op = {
1897 .cl_key = module->set_key,
1898 .tid = module->set_tid,
1899 };
1900
1901 op_finish_set(&op, success);
1902 return;
1903 }
1904
1905 msg* m = as_fabric_msg_get(M_TYPE_SMD);
1906
1907 msg_set_uint64(m, SMD_MSG_TID, module->set_tid);
1908 msg_set_uint64(m, SMD_MSG_CLUSTER_KEY, module->set_key);
1909 msg_set_uint32(m, SMD_MSG_OP, success ? SMD_OP_SET_ACK : SMD_OP_SET_NACK);
1910
1911 msg_set_uint32(m, SMD_MSG_MODULE_ID, (uint32_t)module->id);
1912
1913 if (as_fabric_send(module->set_src, m, AS_FABRIC_CHANNEL_META) !=
1914 AS_FABRIC_SUCCESS) {
1915 as_fabric_msg_put(m);
1916 }
1917}
1918
1919static void
1920send_set_from_orig(uint32_t set_tid, smd_set_entry* entry)
1921{
1922 const as_smd_item* item = entry->item;
1923
1924 if (smd_is_pr()) {
1925 smd_op op = {
1926 .type = SMD_OP_SET_TO_PR,
1927 .src = g_config.self_node,
1928 .module = entry->module,
1929 .cl_key = entry->cl_key,
1930 .tid = set_tid
1931 };
1932
1933 item_vec_init(&op.items, 1);
1934 item_vec_set(&op.items, 0,
1935 smd_item_create_copy(item->key, item->value, 0, 0));
1936
1937 // Set this before calling op_set_to_pr() - call may destroy entry.
1938 // (Also - call won't alter retry_next_ms since we are calling as pr.)
1939 entry->retry_next_ms = 0;
1940
1941 op_set_to_pr(&op);
1942 item_vec_destroy(&op.items);
1943
1944 return;
1945 }
1946
1947 msg* m = as_fabric_msg_get(M_TYPE_SMD);
1948
1949 msg_set_uint64(m, SMD_MSG_TID, set_tid);
1950 msg_set_uint64(m, SMD_MSG_CLUSTER_KEY, entry->cl_key);
1951 msg_set_uint32(m, SMD_MSG_OP, SMD_OP_SET_TO_PR);
1952
1953 msg_set_uint32(m, SMD_MSG_MODULE_ID, (uint32_t)entry->module->id);
1954 msg_set_str(m, SMD_MSG_SINGLE_KEY, item->key, MSG_SET_COPY);
1955
1956 if (item->value != NULL) {
1957 msg_set_str(m, SMD_MSG_SINGLE_VALUE, item->value, MSG_SET_COPY);
1958 }
1959
1960 if (as_fabric_send(g_smd.succession[0], m, AS_FABRIC_CHANNEL_META) !=
1961 AS_FABRIC_SUCCESS) {
1962 as_fabric_msg_put(m);
1963 }
1964
1965 entry->retry_next_ms = cf_getms() + SET_RETRY_MS;
1966}
1967
1968
1969//==========================================================
1970// Local helpers - fabric msg retransmit.
1971//
1972
1973static void
1974pr_send_msgs(smd_module* module)
1975{
1976 if (module->retry_msg_count == 0) {
1977 module->retry_next_ms = 0;
1978 return;
1979 }
1980
1981 for (uint32_t i = 1; i < g_smd.node_count; i++) {
1982 if (module->retry_msgs[i] == NULL) {
1983 continue;
1984 }
1985
1986 msg_incr_ref(module->retry_msgs[i]);
1987
1988 if (as_fabric_send(g_smd.succession[i], module->retry_msgs[i],
1989 AS_FABRIC_CHANNEL_META) != AS_FABRIC_SUCCESS) {
1990 as_fabric_msg_put(module->retry_msgs[i]);
1991 }
1992 }
1993
1994 module->retry_next_ms = cf_getms() + SMD_RETRY_MS;
1995}
1996
1997static void
1998pr_set_retry_msg(smd_module* module, msg* m)
1999{
2000 module->retry_msgs[0] = NULL;
2001
2002 for (uint32_t i = 1; i < g_smd.node_count; i++) {
2003 msg_incr_ref(m);
2004 module->retry_msgs[i] = m;
2005 }
2006
2007 as_fabric_msg_put(m);
2008 module->retry_msg_count = g_smd.node_count - 1;
2009 module->retry_next_ms = 0;
2010}
2011
2012// Return false when already marked or state doesn't match.
2013static bool
2014pr_mark_reply(smd_op* op, smd_state state)
2015{
2016 smd_module* module = op->module;
2017
2018 if (module->state != state) {
2019 OP_DETAIL("wrong state %u", state);
2020 return false;
2021 }
2022
2023 if (module->retry_msgs[op->node_index] == NULL) {
2024 OP_DETAIL("already marked");
2025 return false; // ignore retransmit
2026 }
2027
2028 as_fabric_msg_put(module->retry_msgs[op->node_index]);
2029 module->retry_msgs[op->node_index] = NULL;
2030 module->retry_msg_count--;
2031
2032 if (module->retry_msg_count == 0) {
2033 module->retry_next_ms = 0;
2034 }
2035
2036 return true;
2037}
2038
2039static void
2040pr_clear_retry_msgs(smd_module* module)
2041{
2042 for (uint32_t i = 1; i < g_smd.node_count; i++) {
2043 if (module->retry_msgs[i] != NULL) {
2044 as_fabric_msg_put(module->retry_msgs[i]);
2045 module->retry_msgs[i] = NULL;
2046 }
2047 }
2048
2049 module->retry_msg_count = 0;
2050 module->retry_next_ms = 0;
2051}
2052
2053
2054//==========================================================
2055// Local helpers - call module accept_cb.
2056//
2057
2058static void
2059module_accept_item(smd_module* module, const as_smd_item* item)
2060{
2061 item_vec_define(vec, 1);
2062
2063 item_vec_append(&vec, item);
2064 module->accept_cb(&vec, AS_SMD_ACCEPT_OPT_SET);
2065}
2066
2067static void
2068module_accept_list(smd_module* module, const cf_vector* list)
2069{
2070 module->accept_cb(list, AS_SMD_ACCEPT_OPT_SET);
2071}
2072
2073static void
2074module_accept_startup(smd_module* module)
2075{
2076 uint32_t count = cf_vector_size(&module->db);
2077 cf_vector vec;
2078
2079 item_vec_init(&vec, count);
2080
2081 for (uint32_t i = 0; i < count; i++) {
2082 const as_smd_item* item = item_vec_get_const(&module->db, i);
2083
2084 if (item->value != NULL) {
2085 item_vec_append(&vec, item);
2086 }
2087 }
2088
2089 module->accept_cb(&vec, AS_SMD_ACCEPT_OPT_START);
2090 item_vec_disown_items(&vec);
2091 item_vec_destroy(&vec);
2092}
2093
2094
2095//==========================================================
2096// Local helpers - module.
2097//
2098
2099static void
2100module_regen_key2index(smd_module* module)
2101{
2102 smd_hash_clear(&module->db_h);
2103
2104 for (uint32_t i = 0; i < cf_vector_size(&module->db); i++) {
2105 const char* key = item_vec_get_const(&module->db, i)->key;
2106
2107 smd_hash_put(&module->db_h, key, i);
2108 }
2109}
2110
2111static void
2112module_append_item(smd_module* module, as_smd_item* item)
2113{
2114 smd_hash_put(&module->db_h, item->key, cf_vector_size(&module->db));
2115 item_vec_append(&module->db, item);
2116}
2117
2118static void
2119module_fill_msg(smd_module* module, msg* m)
2120{
2121 msg_set_uint64(m, SMD_MSG_CLUSTER_KEY, g_smd.cl_key);
2122
2123 msg_set_uint32(m, SMD_MSG_MODULE_ID, module->id);
2124
2125 msg_set_uint64(m, SMD_MSG_COMMITTED_CL_KEY, module->cv_key);
2126 msg_set_uint64(m, SMD_MSG_TID, module->cv_tid);
2127
2128 uint32_t count = cf_vector_size(&module->db);
2129
2130 cf_vector_define(key_vec, sizeof(msg_buf_ele), count, 0);
2131 cf_vector_define(val_vec, sizeof(msg_buf_ele), count, 0);
2132 uint32_t gen_list[count];
2133
2134 msg_set_uint64_array_size(m, SMD_MSG_TS_ARRAY, count);
2135
2136 for (uint32_t i = 0; i < count; i++) {
2137 const as_smd_item* item = item_vec_get_const(&module->db, i);
2138
2139 msg_buf_ele key_e = {
2140 .sz = (uint32_t)strlen(item->key),
2141 .ptr = (uint8_t*)item->key
2142 };
2143
2144 cf_vector_append(&key_vec, &key_e);
2145
2146 msg_buf_ele val_e = {
2147 .sz = item->value != NULL ? (uint32_t)strlen(item->value) : 0,
2148 .ptr = (uint8_t*)item->value
2149 };
2150
2151 cf_vector_append(&val_vec, &val_e);
2152
2153 gen_list[i] = item->generation;
2154 msg_set_uint64_array(m, SMD_MSG_TS_ARRAY, i, item->timestamp);
2155 }
2156
2157 msg_msgpack_list_set_buf(m, SMD_MSG_KEY_LIST, &key_vec);
2158 msg_msgpack_list_set_buf(m, SMD_MSG_VALUE_LIST, &val_vec);
2159 msg_msgpack_list_set_uint32(m, SMD_MSG_GEN_LIST, gen_list, count);
2160}
2161
2162static void
2163module_merge_list(smd_module* module, cf_vector* list)
2164{
2165 smd_hash* orig_hash = &module->db_h;
2166 smd_hash* merge_hash = &module->merge_h;
2167
2168 for (uint32_t i = 0; i < cf_vector_size(list); i++) {
2169 as_smd_item* new_item = item_vec_get(list, i);
2170 uint32_t ix;
2171
2172 if (smd_hash_get(merge_hash, new_item->key, &ix)) {
2173 const as_smd_item* item = item_vec_get_const(&module->merge, ix);
2174 bool has_tombstone = new_item->value == NULL || item->value == NULL;
2175 as_smd_conflict_fn cb = has_tombstone ?
2176 smd_item_is_less : module->conflict_cb;
2177
2178 if (! cb(item, new_item)) {
2179 continue;
2180 }
2181
2182 item_vec_replace(&module->merge, ix, new_item);
2183 item_vec_set(list, i, NULL);
2184 continue;
2185 }
2186
2187 if (smd_hash_get(orig_hash, new_item->key, &ix)) {
2188 const as_smd_item* item = item_vec_get_const(&module->db, ix);
2189 bool has_tombstone = new_item->value == NULL || item->value == NULL;
2190 as_smd_conflict_fn cb = has_tombstone ?
2191 smd_item_is_less : module->conflict_cb;
2192
2193 if (! cb(item, new_item)) {
2194 continue;
2195 }
2196 }
2197
2198 // New merge_hash key.
2199 smd_hash_put(merge_hash, new_item->key, cf_vector_size(&module->merge));
2200 item_vec_append(&module->merge, new_item);
2201
2202 item_vec_set(list, i, NULL);
2203 }
2204}
2205
2206static void
2207module_set_npr(smd_module* module, as_smd_item* item)
2208{
2209 uint32_t ix;
2210
2211 if (! smd_hash_get(&module->db_h, item->key, &ix)) { // new key
2212 OP_TYPE_DETAIL(SMD_OP_SET_FROM_PR, "new key %s", item->key);
2213
2214 module_append_item(module, item);
2215 module_commit_to_disk(module);
2216 module_accept_item(module, item);
2217 return;
2218 }
2219 // else - existing key.
2220
2221 const as_smd_item* old = item_vec_get_const(&module->db, ix);
2222
2223 if (item->generation != old->generation ||
2224 item->timestamp != old->timestamp) {
2225 OP_TYPE_DETAIL(SMD_OP_SET_FROM_PR, "key %s", item->key);
2226
2227 item_vec_replace(&module->db, ix, item);
2228
2229 module_commit_to_disk(module);
2230 module_accept_item(module, item);
2231 return;
2232 }
2233
2234 OP_TYPE_DETAIL(SMD_OP_SET_FROM_PR, "key %s - ignoring unchanged value",
2235 item->key);
2236
2237 smd_item_destroy(item);
2238}
2239
2240// key and value are malloc handoffs
2241static bool
2242module_set_pr(smd_module* module, char* key, char* value)
2243{
2244 uint32_t ix;
2245
2246 if (! smd_hash_get(&module->db_h, key, &ix)) { // new key
2247 as_smd_item* item = smd_item_create_handoff(key, value,
2248 cf_clepoch_milliseconds(), 1);
2249
2250 module_append_item(module, item);
2251 send_set_from_pr(module, item);
2252 module_commit_to_disk(module);
2253 module_accept_item(module, item);
2254 return true;
2255 }
2256 // else - existing key.
2257
2258 as_smd_item* item = item_vec_get(&module->db, ix);
2259 bool has_tombstone = item->value == NULL || value == NULL;
2260
2261 if (! has_tombstone) {
2262 if (module->conflict_cb != smd_item_is_less) {
2263 as_smd_item check_item = { .key = key, .value = value };
2264
2265 if (! module->conflict_cb(item, &check_item)) {
2266 OP_TYPE_DETAIL(SMD_OP_SET_TO_PR, "key %s - module rejected item",
2267 key);
2268 cf_free(key);
2269 smd_item_value_destroy(value);
2270 return false;
2271 }
2272 }
2273
2274 if (strcmp(item->value, value) == 0) { // ignore if same value
2275 OP_TYPE_DETAIL(SMD_OP_SET_TO_PR, "key %s - rejected unchanged item",
2276 item->key);
2277 cf_free(key);
2278 smd_item_value_destroy(value);
2279 return false;
2280 }
2281 }
2282 else if (item->value == value) { // i.e. both are NULL
2283 OP_TYPE_DETAIL(SMD_OP_SET_TO_PR, "key %s - rejected unchanged tombstone",
2284 item->key);
2285 cf_free(key);
2286 smd_item_value_destroy(value);
2287 return false;
2288 }
2289
2290 cf_free(key);
2291 smd_item_value_destroy(item->value);
2292
2293 item->value = value; // malloc handoff
2294 item->generation++;
2295 item->timestamp = cf_clepoch_milliseconds();
2296
2297 send_set_from_pr(module, item);
2298 module_commit_to_disk(module);
2299 module_accept_item(module, item);
2300
2301 return true;
2302}
2303
2304static void
2305module_restore_from_disk(smd_module* module)
2306{
2307 module->cv_key = 0;
2308 module->cv_tid = 0;
2309
2310 char smd_path[MAX_PATH_LEN];
2311
2312 sprintf(smd_path, "%s/smd/%s.smd", g_config.work_directory, module->name);
2313
2314 struct stat buf;
2315 int ret = stat(smd_path, &buf);
2316
2317 if (ret != 0) {
2318 if (ret == ENOENT) {
2319 cf_crash(AS_SMD, "failed to read file '%s' module '%s': %s (%d)",
2320 smd_path, module->name, cf_strerror(errno), errno);
2321 }
2322
2323 cf_info(AS_SMD, "no file '%s' - starting empty", smd_path);
2324 item_vec_init(&module->db, 0);
2325 return;
2326 }
2327
2328 size_t load_flags = JSON_REJECT_DUPLICATES;
2329 json_error_t json_error;
2330 json_t* j_file = json_load_file(smd_path, load_flags, &json_error);
2331
2332 if (j_file == NULL) {
2333 cf_warning(AS_SMD, "invalid file '%s' - module '%s' with JSON error %s source %s line %d column %d position %d",
2334 smd_path, module->name, json_error.text, json_error.source,
2335 json_error.line, json_error.column, json_error.position);
2336 item_vec_init(&module->db, 0);
2337 return;
2338 }
2339
2340 if (! json_is_array(j_file)) {
2341 cf_warning(AS_SMD, "invalid file '%s' - starting empty", smd_path);
2342 json_decref(j_file);
2343 item_vec_init(&module->db, 0);
2344 return;
2345 }
2346
2347 size_t num_items = json_array_size(j_file);
2348
2349 if (num_items == 0) {
2350 json_decref(j_file);
2351 item_vec_init(&module->db, 0);
2352 return;
2353 }
2354
2355 json_t* j_item = json_array_get(j_file, 0);
2356 uint32_t start = 0;
2357
2358 if (json_is_array(j_item)) {
2359 start = 1;
2360
2361 json_t* j_ck = json_array_get(j_item, 0);
2362 json_t* j_tid = json_array_get(j_item, 1);
2363
2364 if (j_ck != NULL && j_tid != NULL) {
2365 module->cv_key = (uint64_t)json_integer_value(j_ck);
2366 module->cv_tid = (uint64_t)json_integer_value(j_tid);
2367 }
2368 }
2369 else {
2370 module->cv_tid = 1; // key 0 tid 1 means old SMD db with entries > 0
2371 }
2372
2373 cf_detail(AS_SMD, "{%s} module_restore_from_disk",
2374 MODULE_AS_STRING(module));
2375
2376 item_vec_init(&module->db, (uint32_t)num_items - start);
2377
2378 for (uint32_t i = start; i < num_items; i++) {
2379 j_item = json_array_get(j_file, i);
2380 cf_assert(json_is_object(j_item), AS_SMD, "invalid file '%s'",
2381 smd_path);
2382
2383 const char* key = json_string_value(json_object_get(j_item, "key"));
2384 cf_assert(key != NULL, AS_SMD, "invalid file '%s'", smd_path);
2385
2386 json_t* j_value = json_object_get(j_item, "value");
2387 cf_assert(j_value != NULL && (json_is_string(j_value) ||
2388 json_is_null(j_value)), AS_SMD, "invalid file '%s'", smd_path);
2389
2390 json_t* j_gen = json_object_get(j_item, "generation");
2391 cf_assert(j_gen != NULL && json_is_integer(j_gen), AS_SMD, "invalid file '%s'",
2392 smd_path);
2393
2394 json_t* j_ts = json_object_get(j_item, "timestamp");
2395 cf_assert(j_ts != NULL && json_is_integer(j_ts), AS_SMD, "invalid file '%s'",
2396 smd_path);
2397
2398 item_vec_set(&module->db, i - start,
2399 smd_item_create_copy(key, json_string_value(j_value),
2400 (uint64_t)json_integer_value(j_ts),
2401 (uint32_t)json_integer_value(j_gen)));
2402 }
2403
2404 json_decref(j_file);
2405 module_regen_key2index(module);
2406}
2407
2408static void
2409module_commit_to_disk(smd_module* module)
2410{
2411 json_t* j_file = json_array();
2412 cf_assert(j_file, AS_SMD, "failed to create json array");
2413
2414 json_t* j_ver = json_array();
2415 cf_assert(j_ver, AS_SMD, "failed to create json array");
2416
2417 JSON_ENFORCE(json_array_append_new(j_ver,
2418 json_integer((json_int_t)module->cv_key)));
2419 JSON_ENFORCE(json_array_append_new(j_ver,
2420 json_integer((json_int_t)module->cv_tid)));
2421
2422 JSON_ENFORCE(json_array_append_new(j_file, j_ver));
2423
2424 for (uint32_t i = 0; i < cf_vector_size(&module->db); i++) {
2425 json_t* j_item = json_object();
2426 const as_smd_item* item = item_vec_get_const(&module->db, i);
2427
2428 cf_assert(j_item, AS_SMD, "failed to create json object");
2429
2430 JSON_ENFORCE(json_object_set_new(j_item, "key",
2431 json_string(item->key)));
2432
2433 if (item->value == NULL) {
2434 JSON_ENFORCE(json_object_set_new(j_item, "value", json_null()));
2435 }
2436 else {
2437 JSON_ENFORCE(json_object_set_new(j_item, "value",
2438 json_string(item->value)));
2439 }
2440
2441 JSON_ENFORCE(json_object_set_new(j_item, "generation",
2442 json_integer(item->generation)));
2443 JSON_ENFORCE(json_object_set_new(j_item, "timestamp",
2444 json_integer((json_int_t)item->timestamp)));
2445
2446 JSON_ENFORCE(json_array_append_new(j_file, j_item));
2447 }
2448
2449 char smd_path[MAX_PATH_LEN];
2450 char smd_save_path[MAX_PATH_LEN + 5];
2451 size_t flags = JSON_INDENT(3) | JSON_ENSURE_ASCII | JSON_PRESERVE_ORDER;
2452
2453 sprintf(smd_path, "%s/smd/%s.smd", g_config.work_directory, module->name);
2454 sprintf(smd_save_path, "%s.save", smd_path);
2455
2456 if (json_dump_file(j_file, smd_save_path, flags) != 0) {
2457 cf_warning(AS_SMD, "failed dump for module '%s' to file '%s': %s (%d)",
2458 module->name, smd_path, cf_strerror(errno), errno);
2459 json_decref(j_file);
2460 return;
2461 }
2462
2463 json_decref(j_file);
2464
2465 if (rename(smd_save_path, smd_path) != 0) {
2466 cf_warning(AS_SMD, "error on renaming existing file '%s': %s (%d)",
2467 smd_save_path, cf_strerror(errno), errno);
2468 }
2469}
2470
2471static void
2472module_set_default_items(smd_module* module, const cf_vector* default_items)
2473{
2474 if (default_items == NULL) {
2475 return;
2476 }
2477
2478 for (uint32_t i = 0; i < cf_vector_size(default_items); i++) {
2479 const as_smd_item* item = item_vec_get_const(default_items, i);
2480
2481 if (! smd_hash_get(&module->db_h, item->key, NULL)) { // new key
2482 // Timestamp 0 means this loses to any non-default version.
2483 module_append_item(module,
2484 smd_item_create_copy(item->key, item->value, 0, 1));
2485 }
2486 }
2487}
2488
2489
2490//==========================================================
2491// Local helpers - hash.
2492//
2493
2494static void
2495smd_hash_init(smd_hash* h)
2496{
2497 memset((void*)h->table, 0, sizeof(h->table));
2498}
2499
2500static void
2501smd_hash_clear(smd_hash* h)
2502{
2503 for (uint32_t i = 0; i < N_HASH_ROWS; i++) {
2504 smd_hash_ele* e = h->table[i].next;
2505
2506 while (e != NULL) {
2507 smd_hash_ele* t = e->next;
2508 cf_free(e);
2509 e = t;
2510 }
2511 }
2512
2513 memset((void*)h->table, 0, sizeof(h->table));
2514}
2515
2516static void
2517smd_hash_put(smd_hash* h, const char* key, uint32_t value)
2518{
2519 smd_hash_ele* e_head = &h->table[smd_hash_get_row_i(key)];
2520
2521 // Nobody in row yet so just set that first element
2522 if (e_head->key == NULL) {
2523 e_head->key = key;
2524 e_head->value = value;
2525 return;
2526 }
2527 // else - allocate new element and insert next to head. Note - boldly
2528 // assume this key is not already in the hash.
2529
2530 smd_hash_ele* e = (smd_hash_ele*)cf_malloc(sizeof(smd_hash_ele));
2531
2532 e->key = key;
2533 e->value = value;
2534
2535 e->next = e_head->next;
2536 e_head->next = e;
2537}
2538
2539// Functions as a "has" if called with a null value.
2540static bool
2541smd_hash_get(const smd_hash* h, const char* key, uint32_t* value)
2542{
2543 const smd_hash_ele* e = &h->table[smd_hash_get_row_i(key)];
2544
2545 if (e->key == NULL) {
2546 return false;
2547 }
2548
2549 while (e) {
2550 if (strcmp(e->key, key) == 0) {
2551 if (value) {
2552 *value = e->value;
2553 }
2554
2555 return true;
2556 }
2557
2558 e = e->next;
2559 }
2560
2561 return false;
2562}
2563
2564static uint32_t
2565smd_hash_get_row_i(const char* key)
2566{
2567 uint64_t hashed_key = cf_hash_fnv32((const uint8_t*)key, strlen(key));
2568
2569 return (uint32_t)(hashed_key % N_HASH_ROWS);
2570}
2571
2572
2573//==========================================================
2574// Local helpers - as_smd_item.
2575//
2576
2577static as_smd_item*
2578smd_item_create_copy(const char* key, const char* value, uint64_t ts,
2579 uint32_t gen)
2580{
2581 return smd_item_create_handoff(cf_strdup(key), smd_item_value_dup(value),
2582 ts, gen);
2583}
2584
2585static as_smd_item*
2586smd_item_create_handoff(char* key, char* value, uint64_t ts, uint32_t gen)
2587{
2588 as_smd_item* item = cf_malloc(sizeof(as_smd_item));
2589
2590 item->key = key;
2591 item->value = value;
2592 item->timestamp = ts;
2593 item->generation = gen;
2594
2595 return item;
2596}
2597
2598static bool
2599smd_item_is_less(const as_smd_item* item0, const as_smd_item* item1)
2600{
2601 return item0->timestamp < item1->timestamp ||
2602 (item0->timestamp == item1->timestamp &&
2603 item0->generation < item1->generation);
2604}
2605
2606static void
2607smd_item_destroy(as_smd_item* item)
2608{
2609 if (item != NULL) {
2610 cf_free(item->key);
2611 smd_item_value_destroy(item->value);
2612 cf_free(item);
2613 }
2614}
2615
2616static char*
2617smd_item_value_ndup(uint8_t* value, uint32_t sz)
2618{
2619 if (value == NULL) {
2620 return NULL;
2621 }
2622
2623 return sz == 0 ?
2624 (char*)smd_empty_value : cf_strndup((const char*)value, sz);
2625}
2626
2627static char*
2628smd_item_value_dup(const char* value)
2629{
2630 if (value == NULL) {
2631 return NULL;
2632 }
2633
2634 return value[0] == '\0' ? (char*)smd_empty_value : cf_strdup(value);
2635}
2636
2637static void
2638smd_item_value_destroy(char* value)
2639{
2640 if (value != smd_empty_value) {
2641 cf_free(value);
2642 }
2643}
2644