1/*
2 * migrate.c
3 *
4 * Copyright (C) 2008-2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "fabric/migrate.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <stdio.h>
33#include <string.h>
34#include <sys/syscall.h>
35#include <unistd.h>
36
37#include "citrusleaf/alloc.h"
38#include "citrusleaf/cf_atomic.h"
39#include "citrusleaf/cf_clock.h"
40#include "citrusleaf/cf_digest.h"
41#include "citrusleaf/cf_queue.h"
42
43#include "cf_mutex.h"
44#include "cf_thread.h"
45#include "fault.h"
46#include "msg.h"
47#include "node.h"
48#include "rchash.h"
49#include "shash.h"
50
51#include "base/cfg.h"
52#include "base/datamodel.h"
53#include "base/index.h"
54#include "base/proto.h"
55#include "fabric/exchange.h"
56#include "fabric/fabric.h"
57#include "fabric/meta_batch.h"
58#include "fabric/partition.h"
59#include "fabric/partition_balance.h"
60#include "storage/flat.h"
61#include "storage/storage.h"
62
63
64//==========================================================
65// Typedefs & constants.
66//
67
68const msg_template migrate_mt[] = {
69 { MIG_FIELD_OP, M_FT_UINT32 },
70 { MIG_FIELD_UNUSED_1, M_FT_UINT32 },
71 { MIG_FIELD_EMIG_ID, M_FT_UINT32 },
72 { MIG_FIELD_NAMESPACE, M_FT_BUF },
73 { MIG_FIELD_PARTITION, M_FT_UINT32 },
74 { MIG_FIELD_DIGEST, M_FT_BUF },
75 { MIG_FIELD_GENERATION, M_FT_UINT32 },
76 { MIG_FIELD_RECORD, M_FT_BUF },
77 { MIG_FIELD_CLUSTER_KEY, M_FT_UINT64 },
78 { MIG_FIELD_UNUSED_9, M_FT_BUF },
79 { MIG_FIELD_VOID_TIME, M_FT_UINT32 },
80 { MIG_FIELD_UNUSED_11, M_FT_UINT32 },
81 { MIG_FIELD_UNUSED_12, M_FT_BUF },
82 { MIG_FIELD_INFO, M_FT_UINT32 },
83 { MIG_FIELD_UNUSED_14, M_FT_UINT64 },
84 { MIG_FIELD_UNUSED_15, M_FT_BUF },
85 { MIG_FIELD_UNUSED_16, M_FT_BUF },
86 { MIG_FIELD_UNUSED_17, M_FT_UINT32 },
87 { MIG_FIELD_UNUSED_18, M_FT_UINT32 },
88 { MIG_FIELD_LAST_UPDATE_TIME, M_FT_UINT64 },
89 { MIG_FIELD_FEATURES, M_FT_UINT32 },
90 { MIG_FIELD_UNUSED_21, M_FT_UINT32 },
91 { MIG_FIELD_META_RECORDS, M_FT_BUF },
92 { MIG_FIELD_META_SEQUENCE, M_FT_UINT32 },
93 { MIG_FIELD_META_SEQUENCE_FINAL, M_FT_UINT32 },
94 { MIG_FIELD_PARTITION_SIZE, M_FT_UINT64 },
95 { MIG_FIELD_SET_NAME, M_FT_BUF },
96 { MIG_FIELD_KEY, M_FT_BUF },
97 { MIG_FIELD_UNUSED_28, M_FT_UINT32 },
98 { MIG_FIELD_EMIG_INSERT_ID, M_FT_UINT64 }
99};
100
101COMPILER_ASSERT(sizeof(migrate_mt) / sizeof(msg_template) == NUM_MIG_FIELDS);
102
103#define MIG_MSG_SCRATCH_SIZE 192
104
105#define EMIGRATION_SLOW_Q_WAIT_MS 1000 // 1 second
106#define MIGRATE_RETRANSMIT_STARTDONE_MS 1000 // for now, not configurable
107#define MIGRATE_RETRANSMIT_SIGNAL_MS 1000 // for now, not configurable
108#define MAX_BYTES_EMIGRATING (16 * 1024 * 1024)
109
110#define IMMIGRATION_DEBOUNCE_MS (60 * 1000) // 1 minute
111
112typedef enum {
113 EMIG_START_RESULT_OK,
114 EMIG_START_RESULT_ERROR,
115 EMIG_START_RESULT_EAGAIN
116} emigration_start_result;
117
118typedef enum {
119 // Order matters - we use an atomic set-max that relies on it.
120 EMIG_STATE_ACTIVE,
121 EMIG_STATE_FINISHED,
122 EMIG_STATE_ABORTED
123} emigration_state;
124
125typedef struct emigration_pop_info_s {
126 uint32_t order;
127 uint64_t dest_score;
128 uint32_t type;
129 uint64_t n_elements;
130
131 uint64_t avoid_dest;
132} emigration_pop_info;
133
134typedef struct emigration_reinsert_ctrl_s {
135 uint64_t xmit_ms; // time of last xmit - 0 when done
136 emigration *emig;
137 msg *m;
138} emigration_reinsert_ctrl;
139
140
141//==========================================================
142// Globals.
143//
144
145cf_rchash *g_emigration_hash = NULL;
146cf_rchash *g_immigration_hash = NULL;
147cf_queue g_emigration_q;
148
149static uint64_t g_avoid_dest = 0;
150static cf_atomic32 g_emigration_id = 0;
151static cf_queue g_emigration_slow_q;
152
153
154//==========================================================
155// Forward declarations.
156//
157
158// Various initializers and destructors.
159void emigration_init(emigration *emig);
160void emigration_destroy(void *parm);
161int emigration_reinsert_destroy_reduce_fn(const void *key, void *data, void *udata);
162void immigration_destroy(void *parm);
163
164// Emigration.
165void *run_emigration(void *arg);
166void *run_emigration_slow(void *arg);
167void emigration_pop(emigration **emigp);
168int emigration_pop_reduce_fn(void *buf, void *udata);
169void emigration_hash_insert(emigration *emig);
170void emigration_hash_delete(emigration *emig);
171bool emigrate_transfer(emigration *emig);
172void emigrate_signal(emigration *emig);
173emigration_start_result emigration_send_start(emigration *emig);
174bool emigrate_tree(emigration *emig);
175bool emigration_send_done(emigration *emig);
176void *run_emigration_reinserter(void *arg);
177void emigrate_tree_reduce_fn(as_index_ref *r_ref, void *udata);
178void emigrate_fill_msg(as_storage_rd *rd, msg *m);
179void old_emigrate_fill_msg(as_storage_rd *rd, msg *m);
180int emigration_reinsert_reduce_fn(const void *key, void *data, void *udata);
181void emigrate_record(emigration *emig, msg *m);
182
183// Immigration.
184uint32_t immigration_hashfn(const void *value, uint32_t value_len);
185void *run_immigration_reaper(void *arg);
186int immigration_reaper_reduce_fn(const void *key, uint32_t keylen, void *object, void *udata);
187
188// Migrate fabric message handling.
189int migrate_receive_msg_cb(cf_node src, msg *m, void *udata);
190void immigration_handle_start_request(cf_node src, msg *m);
191void immigration_ack_start_request(cf_node src, msg *m, uint32_t op);
192void immigration_handle_insert_request(cf_node src, msg *m);
193void immigration_handle_old_insert_request(cf_node src, msg *m);
194void immigration_handle_done_request(cf_node src, msg *m);
195void immigration_handle_all_done_request(cf_node src, msg *m);
196void emigration_handle_insert_ack(cf_node src, msg *m);
197void emigration_handle_ctrl_ack(cf_node src, msg *m, uint32_t op);
198
199// Info API helpers.
200int emigration_dump_reduce_fn(const void *key, uint32_t keylen, void *object, void *udata);
201int immigration_dump_reduce_fn(const void *key, uint32_t keylen, void *object, void *udata);
202
203
204//==========================================================
205// Public API.
206//
207
208void
209as_migrate_init()
210{
211 g_avoid_dest = (uint64_t)g_config.self_node;
212
213 cf_queue_init(&g_emigration_q, sizeof(emigration*), 4096, true);
214 cf_queue_init(&g_emigration_slow_q, sizeof(emigration*), 4096, true);
215
216 g_emigration_hash = cf_rchash_create(cf_rchash_fn_u32, emigration_destroy,
217 sizeof(uint32_t), 64, CF_RCHASH_MANY_LOCK);
218
219 g_immigration_hash = cf_rchash_create(immigration_hashfn,
220 immigration_destroy, sizeof(immigration_hkey), 64,
221 CF_RCHASH_BIG_LOCK);
222
223 // Looks like an as_priority_thread_pool, but the reduce-pop is different.
224 for (uint32_t i = 0; i < g_config.n_migrate_threads; i++) {
225 cf_thread_create_detached(run_emigration, NULL);
226 }
227
228 cf_thread_create_detached(run_emigration_slow, NULL);
229 cf_thread_create_detached(run_immigration_reaper, NULL);
230
231 emigrate_fill_queue_init();
232
233 as_fabric_register_msg_fn(M_TYPE_MIGRATE, migrate_mt, sizeof(migrate_mt),
234 MIG_MSG_SCRATCH_SIZE, migrate_receive_msg_cb, NULL);
235}
236
237
238// Kicks off an emigration.
239void
240as_migrate_emigrate(const pb_task *task)
241{
242 emigration *emig = cf_rc_alloc(sizeof(emigration));
243
244 emig->dest = task->dest;
245 emig->cluster_key = task->cluster_key;
246 emig->id = cf_atomic32_incr(&g_emigration_id);
247 emig->type = task->type;
248 emig->tx_flags = task->tx_flags;
249 emig->state = EMIG_STATE_ACTIVE;
250 emig->aborted = false;
251
252 // Create these later only when we need them - we'll get lots at once.
253 emig->bytes_emigrating = 0;
254 emig->reinsert_hash = NULL;
255 emig->insert_id = 0;
256 emig->ctrl_q = NULL;
257 emig->meta_q = NULL;
258
259 as_partition_reserve(task->ns, task->pid, &emig->rsv);
260
261 emig->from_replica = is_self_replica(emig->rsv.p);
262
263 cf_atomic_int_incr(&emig->rsv.ns->migrate_tx_instance_count);
264
265 emigrate_queue_push(emig);
266}
267
268
269// Called via info command. Caller has sanity-checked n_threads.
270void
271as_migrate_set_num_xmit_threads(uint32_t n_threads)
272{
273 if (g_config.n_migrate_threads > n_threads) {
274 // Decrease the number of migrate transmit threads to n_threads.
275 while (g_config.n_migrate_threads > n_threads) {
276 void *death_msg = NULL;
277
278 // Send terminator (NULL message).
279 cf_queue_push(&g_emigration_q, &death_msg);
280 g_config.n_migrate_threads--;
281 }
282 }
283 else {
284 // Increase the number of migrate transmit threads to n_threads.
285 while (g_config.n_migrate_threads < n_threads) {
286 cf_thread_create_detached(run_emigration, NULL);
287 g_config.n_migrate_threads++;
288 }
289 }
290}
291
292
293// Called via info command - print information about migration to the log.
294void
295as_migrate_dump(bool verbose)
296{
297 cf_info(AS_MIGRATE, "migration info:");
298 cf_info(AS_MIGRATE, "---------------");
299 cf_info(AS_MIGRATE, "number of emigrations in g_emigration_hash: %d",
300 cf_rchash_get_size(g_emigration_hash));
301 cf_info(AS_MIGRATE, "number of requested emigrations waiting in g_emigration_q : %d",
302 cf_queue_sz(&g_emigration_q));
303 cf_info(AS_MIGRATE, "number of requested emigrations waiting in g_emigration_slow_q : %d",
304 cf_queue_sz(&g_emigration_slow_q));
305 cf_info(AS_MIGRATE, "number of immigrations in g_immigration_hash: %d",
306 cf_rchash_get_size(g_immigration_hash));
307 cf_info(AS_MIGRATE, "current emigration id: %d", g_emigration_id);
308
309 if (verbose) {
310 int item_num = 0;
311
312 if (cf_rchash_get_size(g_emigration_hash) > 0) {
313 cf_info(AS_MIGRATE, "contents of g_emigration_hash:");
314 cf_info(AS_MIGRATE, "------------------------------");
315
316 cf_rchash_reduce(g_emigration_hash, emigration_dump_reduce_fn,
317 &item_num);
318 }
319
320 if (cf_rchash_get_size(g_immigration_hash) > 0) {
321 item_num = 0;
322
323 cf_info(AS_MIGRATE, "contents of g_immigration_hash:");
324 cf_info(AS_MIGRATE, "-------------------------------");
325
326 cf_rchash_reduce(g_immigration_hash, immigration_dump_reduce_fn,
327 &item_num);
328 }
329 }
330}
331
332
333//==========================================================
334// Local helpers - various initializers and destructors.
335//
336
337void
338emigration_init(emigration *emig)
339{
340 emig->reinsert_hash = cf_shash_create(cf_shash_fn_u32, sizeof(uint64_t),
341 sizeof(emigration_reinsert_ctrl), 16 * 1024, CF_SHASH_MANY_LOCK);
342 emig->ctrl_q = cf_queue_create(sizeof(int), true);
343 emig->meta_q = meta_in_q_create();
344}
345
346
347// Destructor handed to rchash.
348void
349emigration_destroy(void *parm)
350{
351 emigration *emig = (emigration *)parm;
352
353 if (emig->reinsert_hash) {
354 cf_shash_reduce(emig->reinsert_hash,
355 emigration_reinsert_destroy_reduce_fn, NULL);
356 cf_shash_destroy(emig->reinsert_hash);
357 }
358
359 if (emig->ctrl_q) {
360 cf_queue_destroy(emig->ctrl_q);
361 }
362
363 if (emig->meta_q) {
364 meta_in_q_destroy(emig->meta_q);
365 }
366
367 as_partition_release(&emig->rsv);
368
369 cf_atomic_int_decr(&emig->rsv.ns->migrate_tx_instance_count);
370}
371
372
373int
374emigration_reinsert_destroy_reduce_fn(const void *key, void *data, void *udata)
375{
376 emigration_reinsert_ctrl *ri_ctrl = (emigration_reinsert_ctrl *)data;
377
378 as_fabric_msg_put(ri_ctrl->m);
379
380 return CF_SHASH_REDUCE_DELETE;
381}
382
383
384void
385emigration_release(emigration *emig)
386{
387 if (cf_rc_release(emig) == 0) {
388 emigration_destroy((void *)emig);
389 cf_rc_free(emig);
390 }
391}
392
393
394// Destructor handed to rchash.
395void
396immigration_destroy(void *parm)
397{
398 immigration *immig = (immigration *)parm;
399
400 if (immig->rsv.p) {
401 as_partition_release(&immig->rsv);
402 }
403
404 if (immig->meta_q) {
405 meta_out_q_destroy(immig->meta_q);
406 }
407
408 cf_atomic_int_decr(&immig->ns->migrate_rx_instance_count);
409}
410
411
412void
413immigration_release(immigration *immig)
414{
415 if (cf_rc_release(immig) == 0) {
416 immigration_destroy((void *)immig);
417 cf_rc_free(immig);
418 }
419}
420
421
422//==========================================================
423// Local helpers - emigration.
424//
425
426void *
427run_emigration(void *arg)
428{
429 while (true) {
430 emigration *emig;
431
432 emigration_pop(&emig);
433
434 // This is the case for intentionally stopping the migrate thread.
435 if (! emig) {
436 break; // signal of death
437 }
438
439 as_partition_balance_emigration_yield();
440
441 if (emig->cluster_key != as_exchange_cluster_key()) {
442 emigration_hash_delete(emig);
443 continue;
444 }
445
446 as_namespace *ns = emig->rsv.ns;
447 bool requeued = false;
448
449 // Add the emigration to the global hash so acks can find it.
450 emigration_hash_insert(emig);
451
452 switch (emig->type) {
453 case PB_TASK_EMIG_TRANSFER:
454 cf_atomic_int_incr(&ns->migrate_tx_partitions_active);
455 requeued = emigrate_transfer(emig);
456 cf_atomic_int_decr(&ns->migrate_tx_partitions_active);
457 break;
458 case PB_TASK_EMIG_SIGNAL_ALL_DONE:
459 cf_atomic_int_incr(&ns->migrate_signals_active);
460 emigrate_signal(emig);
461 cf_atomic_int_decr(&ns->migrate_signals_active);
462 break;
463 default:
464 cf_crash(AS_MIGRATE, "bad emig type %u", emig->type);
465 break;
466 }
467
468 if (! requeued) {
469 emigration_hash_delete(emig);
470 }
471 }
472
473 return NULL;
474}
475
476
477void *
478run_emigration_slow(void *arg)
479{
480 while (true) {
481 emigration *emig;
482
483 if (cf_queue_pop(&g_emigration_slow_q, (void *)&emig,
484 CF_QUEUE_FOREVER) != CF_QUEUE_OK) {
485 cf_crash(AS_MIGRATE, "emigration slow queue pop failed");
486 }
487
488 uint64_t now_ms = cf_getms();
489
490 if (emig->wait_until_ms > now_ms) {
491 usleep(1000 * (emig->wait_until_ms - now_ms));
492 }
493
494 cf_queue_push(&g_emigration_q, &emig);
495 }
496
497 return NULL;
498}
499
500
501void
502emigration_pop(emigration **emigp)
503{
504 emigration_pop_info best;
505
506 best.order = 0xFFFFffff;
507 best.dest_score = 0;
508 best.type = 0;
509 best.n_elements = 0xFFFFffffFFFFffff;
510
511 best.avoid_dest = 0;
512
513 if (cf_queue_reduce_pop(&g_emigration_q, (void *)emigp, CF_QUEUE_FOREVER,
514 emigration_pop_reduce_fn, &best) != CF_QUEUE_OK) {
515 cf_crash(AS_MIGRATE, "emigration queue reduce pop failed");
516 }
517}
518
519
520int
521emigration_pop_reduce_fn(void *buf, void *udata)
522{
523 emigration_pop_info *best = (emigration_pop_info *)udata;
524 emigration *emig = *(emigration **)buf;
525
526 if (! emig || // null emig terminates thread
527 emig->cluster_key != as_exchange_cluster_key()) {
528 return -1; // process immediately
529 }
530
531 if (emig->ctrl_q && cf_queue_sz(emig->ctrl_q) > 0) {
532 // This emig was requeued after its start command got an ACK_EAGAIN,
533 // likely because dest hit 'migrate-max-num-incoming'. A new ack has
534 // arrived - if it's ACK_OK, don't leave remote node hanging.
535
536 return -1; // process immediately
537 }
538
539 if (emig->type == PB_TASK_EMIG_SIGNAL_ALL_DONE) {
540 return -1; // process immediately
541 }
542
543 if (best->avoid_dest == 0) {
544 best->avoid_dest = g_avoid_dest;
545 }
546
547 uint32_t order = emig->rsv.ns->migrate_order;
548 uint64_t dest_score = (uint64_t)emig->dest - best->avoid_dest;
549 uint32_t type = (emig->tx_flags & TX_FLAGS_LEAD) != 0 ?
550 2 : ((emig->tx_flags & TX_FLAGS_CONTINGENT) != 0 ? 1 : 0);
551 uint64_t n_elements = as_index_tree_size(emig->rsv.tree);
552
553 if (order < best->order ||
554 (order == best->order &&
555 (dest_score > best->dest_score ||
556 (dest_score == best->dest_score &&
557 (type > best->type ||
558 (type == best->type &&
559 n_elements < best->n_elements)))))) {
560 best->order = order;
561 best->dest_score = dest_score;
562 best->type = type;
563 best->n_elements = n_elements;
564
565 g_avoid_dest = (uint64_t)emig->dest;
566
567 return -2; // candidate
568 }
569
570 return 0; // not interested
571}
572
573
574void
575emigration_hash_insert(emigration *emig)
576{
577 if (! emig->ctrl_q) {
578 emigration_init(emig); // creates emig->ctrl_q etc.
579
580 cf_rchash_put(g_emigration_hash, (void *)&emig->id, sizeof(emig->id),
581 (void *)emig);
582 }
583}
584
585
586void
587emigration_hash_delete(emigration *emig)
588{
589 if (emig->ctrl_q) {
590 cf_rchash_delete(g_emigration_hash, (void *)&emig->id,
591 sizeof(emig->id));
592 }
593 else {
594 emigration_release(emig);
595 }
596}
597
598
599bool
600emigrate_transfer(emigration *emig)
601{
602 //--------------------------------------------
603 // Send START request.
604 //
605
606 emigration_start_result result = emigration_send_start(emig);
607
608 if (result == EMIG_START_RESULT_EAGAIN) {
609 // Remote node refused migration, requeue and fetch another.
610 emig->wait_until_ms = cf_getms() + EMIGRATION_SLOW_Q_WAIT_MS;
611
612 cf_queue_push(&g_emigration_slow_q, &emig);
613
614 return true; // requeued
615 }
616
617 if (result != EMIG_START_RESULT_OK) {
618 return false; // did not requeue
619 }
620
621 //--------------------------------------------
622 // Send whole tree - may block a while.
623 //
624
625 if (! emigrate_tree(emig)) {
626 return false; // did not requeue
627 }
628
629 //--------------------------------------------
630 // Send DONE request.
631 //
632
633 if (emigration_send_done(emig)) {
634 as_partition_emigrate_done(emig->rsv.ns, emig->rsv.p->id,
635 emig->cluster_key, emig->dest, emig->tx_flags);
636 }
637
638 return false; // did not requeue
639}
640
641
642void
643emigrate_signal(emigration *emig)
644{
645 as_namespace *ns = emig->rsv.ns;
646 msg *m = as_fabric_msg_get(M_TYPE_MIGRATE);
647
648 switch (emig->type) {
649 case PB_TASK_EMIG_SIGNAL_ALL_DONE:
650 msg_set_uint32(m, MIG_FIELD_OP, OPERATION_ALL_DONE);
651 break;
652 default:
653 cf_crash(AS_MIGRATE, "signal: bad emig type %u", emig->type);
654 break;
655 }
656
657 msg_set_uint32(m, MIG_FIELD_EMIG_ID, emig->id);
658 msg_set_uint64(m, MIG_FIELD_CLUSTER_KEY, emig->cluster_key);
659 msg_set_buf(m, MIG_FIELD_NAMESPACE, (const uint8_t *)ns->name,
660 strlen(ns->name), MSG_SET_COPY);
661 msg_set_uint32(m, MIG_FIELD_PARTITION, emig->rsv.p->id);
662
663 uint64_t signal_xmit_ms = 0;
664
665 while (true) {
666 if (emig->cluster_key != as_exchange_cluster_key()) {
667 as_fabric_msg_put(m);
668 return;
669 }
670
671 uint64_t now = cf_getms();
672
673 if (signal_xmit_ms + MIGRATE_RETRANSMIT_SIGNAL_MS < now) {
674 as_fabric_retransmit(emig->dest, m,
675 AS_FABRIC_CHANNEL_CTRL);
676 signal_xmit_ms = now;
677 }
678
679 int op;
680
681 if (cf_queue_pop(emig->ctrl_q, &op, MIGRATE_RETRANSMIT_SIGNAL_MS) ==
682 CF_QUEUE_OK) {
683 switch (op) {
684 case OPERATION_ALL_DONE_ACK:
685 as_partition_signal_done(ns, emig->rsv.p->id,
686 emig->cluster_key);
687 as_fabric_msg_put(m);
688 return;
689 default:
690 cf_warning(AS_MIGRATE, "signal: unexpected ctrl op %d", op);
691 break;
692 }
693 }
694 }
695}
696
697
698emigration_start_result
699emigration_send_start(emigration *emig)
700{
701 as_namespace *ns = emig->rsv.ns;
702 msg *m = as_fabric_msg_get(M_TYPE_MIGRATE);
703
704 msg_set_uint32(m, MIG_FIELD_OP, OPERATION_START);
705 msg_set_uint32(m, MIG_FIELD_FEATURES, MY_MIG_FEATURES);
706 msg_set_uint64(m, MIG_FIELD_PARTITION_SIZE,
707 as_index_tree_size(emig->rsv.tree));
708 msg_set_uint32(m, MIG_FIELD_EMIG_ID, emig->id);
709 msg_set_uint64(m, MIG_FIELD_CLUSTER_KEY, emig->cluster_key);
710 msg_set_buf(m, MIG_FIELD_NAMESPACE, (const uint8_t *)ns->name,
711 strlen(ns->name), MSG_SET_COPY);
712 msg_set_uint32(m, MIG_FIELD_PARTITION, emig->rsv.p->id);
713
714 uint64_t start_xmit_ms = 0;
715
716 while (true) {
717 if (emig->cluster_key != as_exchange_cluster_key()) {
718 as_fabric_msg_put(m);
719 return EMIG_START_RESULT_ERROR;
720 }
721
722 uint64_t now = cf_getms();
723
724 if (cf_queue_sz(emig->ctrl_q) == 0 &&
725 start_xmit_ms + MIGRATE_RETRANSMIT_STARTDONE_MS < now) {
726 as_fabric_retransmit(emig->dest, m,
727 AS_FABRIC_CHANNEL_CTRL);
728 start_xmit_ms = now;
729 }
730
731 int op;
732
733 if (cf_queue_pop(emig->ctrl_q, &op, MIGRATE_RETRANSMIT_STARTDONE_MS) ==
734 CF_QUEUE_OK) {
735 switch (op) {
736 case OPERATION_START_ACK_OK:
737 as_fabric_msg_put(m);
738 return EMIG_START_RESULT_OK;
739 case OPERATION_START_ACK_EAGAIN:
740 as_fabric_msg_put(m);
741 return EMIG_START_RESULT_EAGAIN;
742 case OPERATION_START_ACK_FAIL:
743 cf_warning(AS_MIGRATE, "imbalance: dest refused migrate with ACK_FAIL");
744 cf_atomic_int_incr(&ns->migrate_tx_partitions_imbalance);
745 as_fabric_msg_put(m);
746 return EMIG_START_RESULT_ERROR;
747 default:
748 cf_warning(AS_MIGRATE, "unexpected ctrl op %d", op);
749 break;
750 }
751 }
752 }
753
754 // Should never get here.
755 cf_crash(AS_MIGRATE, "unexpected - exited infinite while loop");
756
757 return EMIG_START_RESULT_ERROR;
758}
759
760
761bool
762emigrate_tree(emigration *emig)
763{
764 if (as_index_tree_size(emig->rsv.tree) == 0) {
765 return true;
766 }
767
768 cf_atomic32_set(&emig->state, EMIG_STATE_ACTIVE);
769
770 cf_tid tid = cf_thread_create_joinable(run_emigration_reinserter,
771 (void*)emig);
772
773 as_index_reduce(emig->rsv.tree, emigrate_tree_reduce_fn, emig);
774
775 // Sets EMIG_STATE_FINISHED only if not already EMIG_STATE_ABORTED.
776 cf_atomic32_setmax(&emig->state, EMIG_STATE_FINISHED);
777
778 cf_thread_join(tid);
779
780 return emig->state != EMIG_STATE_ABORTED;
781}
782
783
784bool
785emigration_send_done(emigration *emig)
786{
787 as_namespace *ns = emig->rsv.ns;
788
789 if (! as_partition_pre_emigrate_done(ns, emig->rsv.p->id, emig->cluster_key,
790 emig->tx_flags)) {
791 return false;
792 }
793
794 msg *m = as_fabric_msg_get(M_TYPE_MIGRATE);
795
796 msg_set_uint32(m, MIG_FIELD_OP, OPERATION_DONE);
797 msg_set_uint32(m, MIG_FIELD_EMIG_ID, emig->id);
798
799 uint64_t done_xmit_ms = 0;
800
801 while (true) {
802 if (emig->cluster_key != as_exchange_cluster_key()) {
803 as_fabric_msg_put(m);
804 return false;
805 }
806
807 uint64_t now = cf_getms();
808
809 if (done_xmit_ms + MIGRATE_RETRANSMIT_STARTDONE_MS < now) {
810 as_fabric_retransmit(emig->dest, m,
811 AS_FABRIC_CHANNEL_CTRL);
812 done_xmit_ms = now;
813 }
814
815 int op;
816
817 if (cf_queue_pop(emig->ctrl_q, &op, MIGRATE_RETRANSMIT_STARTDONE_MS) ==
818 CF_QUEUE_OK) {
819 if (op == OPERATION_DONE_ACK) {
820 as_fabric_msg_put(m);
821 return true;
822 }
823 }
824 }
825
826 // Should never get here.
827 cf_crash(AS_MIGRATE, "unexpected - exited infinite while loop");
828
829 return false;
830}
831
832
833void *
834run_emigration_reinserter(void *arg)
835{
836 emigration *emig = (emigration *)arg;
837 emigration_state emig_state;
838
839 // Reduce over the reinsert hash until finished.
840 while ((emig_state = cf_atomic32_get(emig->state)) != EMIG_STATE_ABORTED) {
841 if (emig->cluster_key != as_exchange_cluster_key()) {
842 cf_atomic32_set(&emig->state, EMIG_STATE_ABORTED);
843 return NULL;
844 }
845
846 usleep(1000);
847
848 if (cf_shash_get_size(emig->reinsert_hash) == 0) {
849 if (emig_state == EMIG_STATE_FINISHED) {
850 return NULL;
851 }
852
853 continue;
854 }
855
856 cf_shash_reduce(emig->reinsert_hash, emigration_reinsert_reduce_fn,
857 (void *)cf_getms());
858 }
859
860 return NULL;
861}
862
863
864void
865emigrate_tree_reduce_fn(as_index_ref *r_ref, void *udata)
866{
867 emigration *emig = (emigration *)udata;
868 as_namespace *ns = emig->rsv.ns;
869 as_record *r = r_ref->r;
870
871 if (emig->aborted) {
872 as_record_done(r_ref, ns);
873 return; // no point continuing to reduce this tree
874 }
875
876 if (emig->cluster_key != as_exchange_cluster_key()) {
877 as_record_done(r_ref, ns);
878 emig->aborted = true;
879 cf_atomic32_set(&emig->state, EMIG_STATE_ABORTED);
880 return; // no point continuing to reduce this tree
881 }
882
883 if (! should_emigrate_record(emig, r_ref)) {
884 as_record_done(r_ref, ns);
885 return;
886 }
887
888 msg *m = as_fabric_msg_get(M_TYPE_MIGRATE);
889
890 msg_set_uint32(m, MIG_FIELD_EMIG_ID, emig->id);
891
892 uint32_t info = emigration_pack_info(emig, r);
893
894 if (info != 0) {
895 msg_set_uint32(m, MIG_FIELD_INFO, info);
896 }
897
898 as_storage_rd rd;
899
900 as_storage_record_open(ns, r, &rd);
901
902 // TODO - old pickle - remove old method in "six months".
903 if (as_exchange_min_compatibility_id() >= 3) {
904 emigrate_fill_msg(&rd, m);
905 }
906 else {
907 old_emigrate_fill_msg(&rd, m);
908 }
909
910 as_storage_record_close(&rd);
911 as_record_done(r_ref, ns);
912
913 // This might block if the queues are backed up.
914 emigrate_record(emig, m);
915
916 cf_atomic_int_incr(&ns->migrate_records_transmitted);
917
918 if (ns->migrate_sleep != 0) {
919 usleep(ns->migrate_sleep);
920 }
921
922 uint32_t waits = 0;
923
924 while (cf_atomic32_get(emig->bytes_emigrating) > MAX_BYTES_EMIGRATING &&
925 emig->cluster_key == as_exchange_cluster_key()) {
926 usleep(1000);
927
928 // Temporary paranoia to inform us old nodes aren't acking properly.
929 if (++waits % (ns->migrate_retransmit_ms * 4) == 0) {
930 cf_warning(AS_MIGRATE, "missing acks from node %lx", emig->dest);
931 }
932 }
933}
934
935
936void
937emigrate_fill_msg(as_storage_rd *rd, msg *m)
938{
939 msg_set_uint32(m, MIG_FIELD_OP, OPERATION_INSERT);
940
941 as_storage_record_get_pickle(rd); // FIXME - handle error returned
942
943 msg_set_buf(m, MIG_FIELD_RECORD, rd->pickle, rd->pickle_sz,
944 MSG_SET_HANDOFF_MALLOC);
945}
946
947
948// TODO - old pickle - remove in "six months".
949void
950old_emigrate_fill_msg(as_storage_rd *rd, msg *m)
951{
952 msg_set_uint32(m, MIG_FIELD_OP, OPERATION_OLD_INSERT);
953
954 as_namespace *ns = rd->ns;
955 as_record *r = rd->r;
956
957 as_storage_rd_load_n_bins(rd); // TODO - handle error returned
958
959 as_bin stack_bins[ns->storage_data_in_memory ? 0 : rd->n_bins];
960
961 as_storage_rd_load_bins(rd, stack_bins); // TODO - handle error returned
962
963 as_storage_record_get_key(rd); // TODO - handle error returned
964
965 const char *set_name = as_index_get_set_name(r, ns);
966 uint32_t key_size = rd->key_size;
967 uint8_t key[key_size];
968
969 if (key_size != 0) {
970 memcpy(key, rd->key, key_size);
971 }
972
973 msg_set_buf(m, MIG_FIELD_DIGEST, (const uint8_t *)&r->keyd,
974 sizeof(cf_digest), MSG_SET_COPY);
975 msg_set_uint32(m, MIG_FIELD_GENERATION, r->generation);
976 msg_set_uint64(m, MIG_FIELD_LAST_UPDATE_TIME, r->last_update_time);
977
978 if (r->void_time != 0) {
979 msg_set_uint32(m, MIG_FIELD_VOID_TIME, r->void_time);
980 }
981
982 if (set_name) {
983 msg_set_buf(m, MIG_FIELD_SET_NAME, (const uint8_t *)set_name,
984 strlen(set_name), MSG_SET_COPY);
985 }
986
987 if (key_size != 0) {
988 msg_set_buf(m, MIG_FIELD_KEY, key, key_size, MSG_SET_COPY);
989 }
990
991 size_t buf_len;
992 uint8_t* buf = as_record_pickle(rd, &buf_len);
993
994 msg_set_buf(m, MIG_FIELD_RECORD, buf, buf_len, MSG_SET_HANDOFF_MALLOC);
995}
996
997
998int
999emigration_reinsert_reduce_fn(const void *key, void *data, void *udata)
1000{
1001 emigration_reinsert_ctrl *ri_ctrl = (emigration_reinsert_ctrl *)data;
1002 as_namespace *ns = ri_ctrl->emig->rsv.ns;
1003 uint64_t now = (uint64_t)udata;
1004
1005 if (ri_ctrl->xmit_ms + ns->migrate_retransmit_ms < now) {
1006 if (as_fabric_retransmit(ri_ctrl->emig->dest, ri_ctrl->m,
1007 AS_FABRIC_CHANNEL_BULK) != AS_FABRIC_SUCCESS) {
1008 return -1; // this will stop the reduce
1009 }
1010
1011 ri_ctrl->xmit_ms = now;
1012 cf_atomic_int_incr(&ns->migrate_record_retransmits);
1013 }
1014
1015 return 0;
1016}
1017
1018
1019void
1020emigrate_record(emigration *emig, msg *m)
1021{
1022 uint64_t insert_id = emig->insert_id++;
1023
1024 msg_set_uint64(m, MIG_FIELD_EMIG_INSERT_ID, insert_id);
1025
1026 emigration_reinsert_ctrl ri_ctrl;
1027
1028 msg_incr_ref(m); // the reference in the hash
1029 ri_ctrl.m = m;
1030 ri_ctrl.emig = emig;
1031 ri_ctrl.xmit_ms = cf_getms();
1032
1033 cf_shash_put(emig->reinsert_hash, &insert_id, &ri_ctrl);
1034
1035 cf_atomic32_add(&emig->bytes_emigrating, (int32_t)msg_get_wire_size(m));
1036
1037 if (as_fabric_send(emig->dest, m, AS_FABRIC_CHANNEL_BULK) !=
1038 AS_FABRIC_SUCCESS) {
1039 as_fabric_msg_put(m);
1040 }
1041}
1042
1043
1044//==========================================================
1045// Local helpers - immigration.
1046//
1047
1048uint32_t
1049immigration_hashfn(const void *value, uint32_t value_len)
1050{
1051 return ((const immigration_hkey *)value)->emig_id;
1052}
1053
1054
1055void *
1056run_immigration_reaper(void *arg)
1057{
1058 while (true) {
1059 cf_rchash_reduce(g_immigration_hash, immigration_reaper_reduce_fn,
1060 NULL);
1061 sleep(1);
1062 }
1063
1064 return NULL;
1065}
1066
1067
1068int
1069immigration_reaper_reduce_fn(const void *key, uint32_t keylen, void *object,
1070 void *udata)
1071{
1072 immigration *immig = (immigration *)object;
1073
1074 if (immig->start_recv_ms == 0) {
1075 // If the start time isn't set, immigration is still being processed.
1076 return CF_RCHASH_OK;
1077 }
1078
1079 if (immig->cluster_key != as_exchange_cluster_key() ||
1080 (immig->done_recv_ms != 0 && cf_getms() > immig->done_recv_ms +
1081 IMMIGRATION_DEBOUNCE_MS)) {
1082 if (immig->start_result == AS_MIGRATE_OK &&
1083 // If we started ok, must be a cluster key change - make sure
1084 // DONE handler doesn't also decrement active counter.
1085 cf_atomic32_incr(&immig->done_recv) == 1) {
1086 as_namespace *ns = immig->rsv.ns;
1087
1088 if (cf_atomic_int_decr(&ns->migrate_rx_partitions_active) < 0) {
1089 cf_warning(AS_MIGRATE, "migrate_rx_partitions_active < 0");
1090 cf_atomic_int_incr(&ns->migrate_rx_partitions_active);
1091 }
1092 }
1093
1094 return CF_RCHASH_REDUCE_DELETE;
1095 }
1096
1097 return CF_RCHASH_OK;
1098}
1099
1100
1101//==========================================================
1102// Local helpers - migrate fabric message handling.
1103//
1104
1105int
1106migrate_receive_msg_cb(cf_node src, msg *m, void *udata)
1107{
1108 uint32_t op;
1109
1110 if (msg_get_uint32(m, MIG_FIELD_OP, &op) != 0) {
1111 cf_warning(AS_MIGRATE, "received message with no op");
1112 as_fabric_msg_put(m);
1113 return 0;
1114 }
1115
1116 switch (op) {
1117 //--------------------------------------------
1118 // Emigration - handle requests:
1119 //
1120 case OPERATION_MERGE_META:
1121 emigration_handle_meta_batch_request(src, m);
1122 break;
1123
1124 //--------------------------------------------
1125 // Immigration - handle requests:
1126 //
1127 case OPERATION_START:
1128 immigration_handle_start_request(src, m);
1129 break;
1130 case OPERATION_INSERT:
1131 immigration_handle_insert_request(src, m);
1132 break;
1133 case OPERATION_OLD_INSERT:
1134 immigration_handle_old_insert_request(src, m);
1135 break;
1136 case OPERATION_DONE:
1137 immigration_handle_done_request(src, m);
1138 break;
1139 case OPERATION_ALL_DONE:
1140 immigration_handle_all_done_request(src, m);
1141 break;
1142
1143 //--------------------------------------------
1144 // Emigration - handle acknowledgments:
1145 //
1146 case OPERATION_INSERT_ACK:
1147 emigration_handle_insert_ack(src, m);
1148 break;
1149 case OPERATION_START_ACK_OK:
1150 case OPERATION_START_ACK_EAGAIN:
1151 case OPERATION_START_ACK_FAIL:
1152 case OPERATION_DONE_ACK:
1153 case OPERATION_ALL_DONE_ACK:
1154 emigration_handle_ctrl_ack(src, m, op);
1155 break;
1156
1157 //--------------------------------------------
1158 // Immigration - handle acknowledgments:
1159 //
1160 case OPERATION_MERGE_META_ACK:
1161 immigration_handle_meta_batch_ack(src, m);
1162 break;
1163
1164 default:
1165 cf_detail(AS_MIGRATE, "received unexpected message op %u", op);
1166 as_fabric_msg_put(m);
1167 break;
1168 }
1169
1170 return 0;
1171}
1172
1173
1174//----------------------------------------------------------
1175// Immigration - request message handling.
1176//
1177
1178void
1179immigration_handle_start_request(cf_node src, msg *m)
1180{
1181 uint32_t emig_id;
1182
1183 if (msg_get_uint32(m, MIG_FIELD_EMIG_ID, &emig_id) != 0) {
1184 cf_warning(AS_MIGRATE, "handle start: msg get for emig id failed");
1185 as_fabric_msg_put(m);
1186 return;
1187 }
1188
1189 uint64_t cluster_key;
1190
1191 if (msg_get_uint64(m, MIG_FIELD_CLUSTER_KEY, &cluster_key) != 0) {
1192 cf_warning(AS_MIGRATE, "handle start: msg get for cluster key failed");
1193 as_fabric_msg_put(m);
1194 return;
1195 }
1196
1197 uint8_t *ns_name;
1198 size_t ns_name_len;
1199
1200 if (msg_get_buf(m, MIG_FIELD_NAMESPACE, &ns_name, &ns_name_len,
1201 MSG_GET_DIRECT) != 0) {
1202 cf_warning(AS_MIGRATE, "handle start: msg get for namespace failed");
1203 as_fabric_msg_put(m);
1204 return;
1205 }
1206
1207 as_namespace *ns = as_namespace_get_bybuf(ns_name, ns_name_len);
1208
1209 if (! ns) {
1210 cf_warning(AS_MIGRATE, "handle start: bad namespace");
1211 as_fabric_msg_put(m);
1212 return;
1213 }
1214
1215 uint32_t pid;
1216
1217 if (msg_get_uint32(m, MIG_FIELD_PARTITION, &pid) != 0) {
1218 cf_warning(AS_MIGRATE, "handle start: msg get for pid failed");
1219 as_fabric_msg_put(m);
1220 return;
1221 }
1222
1223 uint32_t emig_features = 0;
1224
1225 msg_get_uint32(m, MIG_FIELD_FEATURES, &emig_features);
1226
1227 uint64_t emig_n_recs = 0;
1228
1229 msg_get_uint64(m, MIG_FIELD_PARTITION_SIZE, &emig_n_recs);
1230
1231 msg_preserve_fields(m, 1, MIG_FIELD_EMIG_ID);
1232
1233 immigration *immig = cf_rc_alloc(sizeof(immigration));
1234
1235 cf_atomic_int_incr(&ns->migrate_rx_instance_count);
1236
1237 immig->src = src;
1238 immig->cluster_key = cluster_key;
1239 immig->pid = pid;
1240 immig->start_recv_ms = 0;
1241 immig->done_recv = 0;
1242 immig->done_recv_ms = 0;
1243 immig->emig_id = emig_id;
1244 immig->meta_q = meta_out_q_create();
1245 immig->features = MY_MIG_FEATURES;
1246 immig->ns = ns;
1247 immig->rsv.p = NULL;
1248
1249 immigration_hkey hkey;
1250
1251 hkey.src = src;
1252 hkey.emig_id = emig_id;
1253
1254 while (true) {
1255 if (cf_rchash_put_unique(g_immigration_hash, (void *)&hkey,
1256 sizeof(hkey), (void *)immig) == CF_RCHASH_OK) {
1257 cf_rc_reserve(immig); // so either put or get yields ref-count 2
1258
1259 // First start request (not a retransmit) for this pid this round,
1260 // or we had ack'd previous start request with 'EAGAIN'.
1261 immig->start_result = as_partition_immigrate_start(ns, pid,
1262 cluster_key, src);
1263 break;
1264 }
1265
1266 immigration *immig0;
1267
1268 if (cf_rchash_get(g_immigration_hash, (void *)&hkey, sizeof(hkey),
1269 (void *)&immig0) == CF_RCHASH_OK) {
1270 immigration_release(immig); // free just-alloc'd immig ...
1271
1272 if (immig0->start_recv_ms == 0) {
1273 immigration_release(immig0);
1274 return; // allow previous thread to respond
1275 }
1276
1277 if (immig0->cluster_key != cluster_key) {
1278 immigration_release(immig0);
1279 return; // other node reused an immig_id, allow reaper to reap
1280 }
1281
1282 immig = immig0; // ... and use original
1283 break;
1284 }
1285 }
1286
1287 switch (immig->start_result) {
1288 case AS_MIGRATE_OK:
1289 break;
1290 case AS_MIGRATE_FAIL:
1291 immig->start_recv_ms = cf_getms(); // permits reaping
1292 immig->done_recv_ms = immig->start_recv_ms; // permits reaping
1293 immigration_release(immig);
1294 immigration_ack_start_request(src, m, OPERATION_START_ACK_FAIL);
1295 return;
1296 case AS_MIGRATE_AGAIN:
1297 // Remove from hash so that the immig can be tried again.
1298 cf_rchash_delete(g_immigration_hash, (void *)&hkey, sizeof(hkey));
1299 immigration_release(immig);
1300 immigration_ack_start_request(src, m, OPERATION_START_ACK_EAGAIN);
1301 return;
1302 default:
1303 cf_crash(AS_MIGRATE, "unexpected as_partition_immigrate_start result");
1304 break;
1305 }
1306
1307 if (immig->start_recv_ms == 0) {
1308 as_partition_reserve(ns, pid, &immig->rsv);
1309 cf_atomic_int_incr(&immig->rsv.ns->migrate_rx_partitions_active);
1310
1311 if (! immigration_start_meta_sender(immig, emig_features,
1312 emig_n_recs)) {
1313 immig->features &= ~MIG_FEATURE_MERGE;
1314 }
1315
1316 immig->start_recv_ms = cf_getms(); // permits reaping
1317 }
1318
1319 msg_set_uint32(m, MIG_FIELD_FEATURES, immig->features);
1320
1321 immigration_release(immig);
1322 immigration_ack_start_request(src, m, OPERATION_START_ACK_OK);
1323}
1324
1325
1326void
1327immigration_ack_start_request(cf_node src, msg *m, uint32_t op)
1328{
1329 msg_set_uint32(m, MIG_FIELD_OP, op);
1330
1331 if (as_fabric_send(src, m, AS_FABRIC_CHANNEL_CTRL) != AS_FABRIC_SUCCESS) {
1332 as_fabric_msg_put(m);
1333 }
1334}
1335
1336
1337void
1338immigration_handle_insert_request(cf_node src, msg *m)
1339{
1340 uint32_t emig_id;
1341
1342 if (msg_get_uint32(m, MIG_FIELD_EMIG_ID, &emig_id) != 0) {
1343 cf_warning(AS_MIGRATE, "handle insert: msg get for emig id failed");
1344 as_fabric_msg_put(m);
1345 return;
1346 }
1347
1348 immigration_hkey hkey;
1349
1350 hkey.src = src;
1351 hkey.emig_id = emig_id;
1352
1353 immigration *immig;
1354
1355 if (cf_rchash_get(g_immigration_hash, (void *)&hkey, sizeof(hkey),
1356 (void **)&immig) != CF_RCHASH_OK) {
1357 // The immig no longer exists, likely the cluster key advanced and this
1358 // record immigration is from prior round. Do not ack this request.
1359 as_fabric_msg_put(m);
1360 return;
1361 }
1362
1363 if (immig->start_result != AS_MIGRATE_OK || immig->start_recv_ms == 0) {
1364 // If this immigration didn't start and reserve a partition, it's
1365 // likely in the hash on a retransmit and this insert is for the
1366 // original - ignore, and let this immigration proceed.
1367 immigration_release(immig);
1368 as_fabric_msg_put(m);
1369 return;
1370 }
1371
1372 cf_atomic_int_incr(&immig->rsv.ns->migrate_record_receives);
1373
1374 if (immig->cluster_key != as_exchange_cluster_key()) {
1375 immigration_release(immig);
1376 as_fabric_msg_put(m);
1377 return;
1378 }
1379
1380 as_remote_record rr = { .src = src, .rsv = &immig->rsv };
1381
1382 if (msg_get_buf(m, MIG_FIELD_RECORD, &rr.pickle, &rr.pickle_sz,
1383 MSG_GET_DIRECT) != 0) {
1384 cf_warning(AS_MIGRATE, "handle insert: got no record");
1385 immigration_release(immig);
1386 as_fabric_msg_put(m);
1387 return;
1388 }
1389
1390 if (! as_flat_unpack_remote_record_meta(rr.rsv->ns, &rr)) {
1391 cf_warning(AS_MIGRATE, "handle insert: got bad record");
1392 immigration_release(immig);
1393 as_fabric_msg_put(m);
1394 return;
1395 }
1396
1397 uint32_t info = 0;
1398
1399 msg_get_uint32(m, MIG_FIELD_INFO, &info);
1400
1401 immigration_init_repl_state(&rr, info);
1402
1403 int rv = as_record_replace_if_better(&rr, false, false, false);
1404
1405 // If replace failed, don't ack - it will be retransmitted.
1406 if (! (rv == AS_OK ||
1407 // Migrations just treat these errors as successful no-ops:
1408 rv == AS_ERR_RECORD_EXISTS || rv == AS_ERR_GENERATION)) {
1409 immigration_release(immig);
1410 as_fabric_msg_put(m);
1411 return;
1412 }
1413
1414 immigration_release(immig);
1415
1416 msg_preserve_fields(m, 2, MIG_FIELD_EMIG_INSERT_ID, MIG_FIELD_EMIG_ID);
1417
1418 msg_set_uint32(m, MIG_FIELD_OP, OPERATION_INSERT_ACK);
1419
1420 if (as_fabric_send(src, m, AS_FABRIC_CHANNEL_BULK) != AS_FABRIC_SUCCESS) {
1421 as_fabric_msg_put(m);
1422 }
1423}
1424
1425
1426// TODO - old pickle - remove in "six months".
1427void
1428immigration_handle_old_insert_request(cf_node src, msg *m)
1429{
1430 uint32_t emig_id;
1431
1432 if (msg_get_uint32(m, MIG_FIELD_EMIG_ID, &emig_id) != 0) {
1433 cf_warning(AS_MIGRATE, "handle insert: msg get for emig id failed");
1434 as_fabric_msg_put(m);
1435 return;
1436 }
1437
1438 immigration_hkey hkey;
1439
1440 hkey.src = src;
1441 hkey.emig_id = emig_id;
1442
1443 immigration *immig;
1444
1445 if (cf_rchash_get(g_immigration_hash, (void *)&hkey, sizeof(hkey),
1446 (void **)&immig) != CF_RCHASH_OK) {
1447 // The immig no longer exists, likely the cluster key advanced and this
1448 // record immigration is from prior round. Do not ack this request.
1449 as_fabric_msg_put(m);
1450 return;
1451 }
1452
1453 if (immig->start_result != AS_MIGRATE_OK || immig->start_recv_ms == 0) {
1454 // If this immigration didn't start and reserve a partition, it's
1455 // likely in the hash on a retransmit and this insert is for the
1456 // original - ignore, and let this immigration proceed.
1457 immigration_release(immig);
1458 as_fabric_msg_put(m);
1459 return;
1460 }
1461
1462 cf_atomic_int_incr(&immig->rsv.ns->migrate_record_receives);
1463
1464 if (immig->cluster_key != as_exchange_cluster_key()) {
1465 immigration_release(immig);
1466 as_fabric_msg_put(m);
1467 return;
1468 }
1469
1470 as_remote_record rr =
1471 { .src = src, .rsv = &immig->rsv, .is_old_pickle = true };
1472
1473 if (msg_get_buf(m, MIG_FIELD_DIGEST, (uint8_t **)&rr.keyd, NULL,
1474 MSG_GET_DIRECT) != 0) {
1475 cf_warning(AS_MIGRATE, "handle insert: got no digest");
1476 as_fabric_msg_put(m);
1477 return;
1478 }
1479
1480 if (msg_get_buf(m, MIG_FIELD_RECORD, &rr.pickle, &rr.pickle_sz,
1481 MSG_GET_DIRECT) != 0 || rr.pickle_sz < 2) {
1482 cf_warning(AS_MIGRATE, "handle insert: got no or bad record");
1483 immigration_release(immig);
1484 as_fabric_msg_put(m);
1485 return;
1486 }
1487
1488 if (msg_get_uint32(m, MIG_FIELD_GENERATION, &rr.generation) != 0 ||
1489 rr.generation == 0) {
1490 cf_warning(AS_MIGRATE, "handle insert: got no or bad generation");
1491 immigration_release(immig);
1492 as_fabric_msg_put(m);
1493 return;
1494 }
1495
1496 if (msg_get_uint64(m, MIG_FIELD_LAST_UPDATE_TIME,
1497 &rr.last_update_time) != 0) {
1498 cf_warning(AS_MIGRATE, "handle insert: got no last-update-time");
1499 immigration_release(immig);
1500 as_fabric_msg_put(m);
1501 return;
1502 }
1503
1504 msg_get_uint32(m, MIG_FIELD_VOID_TIME, &rr.void_time);
1505
1506 msg_get_buf(m, MIG_FIELD_SET_NAME, (uint8_t **)&rr.set_name,
1507 &rr.set_name_len, MSG_GET_DIRECT);
1508
1509 msg_get_buf(m, MIG_FIELD_KEY, (uint8_t **)&rr.key, &rr.key_size,
1510 MSG_GET_DIRECT);
1511
1512 uint32_t info = 0;
1513
1514 msg_get_uint32(m, MIG_FIELD_INFO, &info);
1515
1516 if (immigration_ignore_pickle(rr.pickle, info)) {
1517 cf_warning_digest(AS_MIGRATE, rr.keyd, "handle insert: binless pickle ");
1518 }
1519 else {
1520 immigration_init_repl_state(&rr, info);
1521
1522 int rv = as_record_replace_if_better(&rr, false, false, false);
1523
1524 // If replace failed, don't ack - it will be retransmitted.
1525 if (! (rv == AS_OK ||
1526 // Migrations just treat these errors as successful no-ops:
1527 rv == AS_ERR_RECORD_EXISTS || rv == AS_ERR_GENERATION)) {
1528 immigration_release(immig);
1529 as_fabric_msg_put(m);
1530 return;
1531 }
1532 }
1533
1534 immigration_release(immig);
1535
1536 msg_preserve_fields(m, 2, MIG_FIELD_EMIG_INSERT_ID, MIG_FIELD_EMIG_ID);
1537
1538 msg_set_uint32(m, MIG_FIELD_OP, OPERATION_INSERT_ACK);
1539
1540 if (as_fabric_send(src, m, AS_FABRIC_CHANNEL_BULK) != AS_FABRIC_SUCCESS) {
1541 as_fabric_msg_put(m);
1542 }
1543}
1544
1545
1546void
1547immigration_handle_done_request(cf_node src, msg *m)
1548{
1549 uint32_t emig_id;
1550
1551 if (msg_get_uint32(m, MIG_FIELD_EMIG_ID, &emig_id) != 0) {
1552 cf_warning(AS_MIGRATE, "handle done: msg get for emig id failed");
1553 as_fabric_msg_put(m);
1554 return;
1555 }
1556
1557 msg_preserve_fields(m, 1, MIG_FIELD_EMIG_ID);
1558
1559 // See if this migration already exists & has been notified.
1560 immigration_hkey hkey;
1561
1562 hkey.src = src;
1563 hkey.emig_id = emig_id;
1564
1565 immigration *immig;
1566
1567 if (cf_rchash_get(g_immigration_hash, (void *)&hkey, sizeof(hkey),
1568 (void **)&immig) == CF_RCHASH_OK) {
1569 if (immig->start_result != AS_MIGRATE_OK || immig->start_recv_ms == 0) {
1570 // If this immigration didn't start and reserve a partition, it's
1571 // likely in the hash on a retransmit and this DONE is for the
1572 // original - ignore, and let this immigration proceed.
1573 immigration_release(immig);
1574 as_fabric_msg_put(m);
1575 return;
1576 }
1577
1578 if (cf_atomic32_incr(&immig->done_recv) == 1) {
1579 // Record the time of the first DONE received.
1580 immig->done_recv_ms = cf_getms();
1581
1582 as_namespace *ns = immig->rsv.ns;
1583
1584 if (cf_atomic_int_decr(&ns->migrate_rx_partitions_active) < 0) {
1585 cf_warning(AS_MIGRATE, "migrate_rx_partitions_active < 0");
1586 cf_atomic_int_incr(&ns->migrate_rx_partitions_active);
1587 }
1588
1589 as_partition_immigrate_done(ns, immig->rsv.p->id,
1590 immig->cluster_key, immig->src);
1591 }
1592 // else - was likely a retransmitted done message.
1593
1594 immigration_release(immig);
1595 }
1596 // else - garbage, or super-stale retransmitted done message.
1597
1598 msg_set_uint32(m, MIG_FIELD_OP, OPERATION_DONE_ACK);
1599
1600 if (as_fabric_send(src, m, AS_FABRIC_CHANNEL_CTRL) != AS_FABRIC_SUCCESS) {
1601 as_fabric_msg_put(m);
1602 }
1603}
1604
1605
1606void
1607immigration_handle_all_done_request(cf_node src, msg *m)
1608{
1609 uint32_t emig_id;
1610
1611 if (msg_get_uint32(m, MIG_FIELD_EMIG_ID, &emig_id) != 0) {
1612 cf_warning(AS_MIGRATE, "handle all done: msg get for emig id failed");
1613 as_fabric_msg_put(m);
1614 return;
1615 }
1616
1617 uint64_t cluster_key;
1618
1619 if (msg_get_uint64(m, MIG_FIELD_CLUSTER_KEY, &cluster_key) != 0) {
1620 cf_warning(AS_MIGRATE, "handle all done: msg get for cluster key failed");
1621 as_fabric_msg_put(m);
1622 return;
1623 }
1624
1625 uint8_t *ns_name;
1626 size_t ns_name_len;
1627
1628 if (msg_get_buf(m, MIG_FIELD_NAMESPACE, &ns_name, &ns_name_len,
1629 MSG_GET_DIRECT) != 0) {
1630 cf_warning(AS_MIGRATE, "handle all done: msg get for namespace failed");
1631 as_fabric_msg_put(m);
1632 return;
1633 }
1634
1635 as_namespace *ns = as_namespace_get_bybuf(ns_name, ns_name_len);
1636
1637 if (! ns) {
1638 cf_warning(AS_MIGRATE, "handle all done: bad namespace");
1639 as_fabric_msg_put(m);
1640 return;
1641 }
1642
1643 uint32_t pid;
1644
1645 if (msg_get_uint32(m, MIG_FIELD_PARTITION, &pid) != 0) {
1646 cf_warning(AS_MIGRATE, "handle all done: msg get for pid failed");
1647 as_fabric_msg_put(m);
1648 return;
1649 }
1650
1651 msg_preserve_fields(m, 1, MIG_FIELD_EMIG_ID);
1652
1653 // TODO - optionally, for replicas we might use this to remove immig objects
1654 // from hash and deprecate timer...
1655
1656 if (as_partition_migrations_all_done(ns, pid, cluster_key) !=
1657 AS_MIGRATE_OK) {
1658 as_fabric_msg_put(m);
1659 return;
1660 }
1661
1662 msg_set_uint32(m, MIG_FIELD_OP, OPERATION_ALL_DONE_ACK);
1663
1664 if (as_fabric_send(src, m, AS_FABRIC_CHANNEL_CTRL) != AS_FABRIC_SUCCESS) {
1665 as_fabric_msg_put(m);
1666 }
1667}
1668
1669
1670//----------------------------------------------------------
1671// Emigration - acknowledgment message handling.
1672//
1673
1674void
1675emigration_handle_insert_ack(cf_node src, msg *m)
1676{
1677 uint32_t emig_id;
1678
1679 if (msg_get_uint32(m, MIG_FIELD_EMIG_ID, &emig_id) != 0) {
1680 cf_warning(AS_MIGRATE, "insert ack: msg get for emig id failed");
1681 as_fabric_msg_put(m);
1682 return;
1683 }
1684
1685 emigration *emig;
1686
1687 if (cf_rchash_get(g_emigration_hash, (void *)&emig_id, sizeof(emig_id),
1688 (void **)&emig) != CF_RCHASH_OK) {
1689 // Probably came from a migration prior to the latest rebalance.
1690 as_fabric_msg_put(m);
1691 return;
1692 }
1693
1694 uint64_t insert_id;
1695
1696 if (msg_get_uint64(m, MIG_FIELD_EMIG_INSERT_ID, &insert_id) != 0) {
1697 cf_warning(AS_MIGRATE, "insert ack: msg get for emig insert id failed");
1698 emigration_release(emig);
1699 as_fabric_msg_put(m);
1700 return;
1701 }
1702
1703 emigration_reinsert_ctrl *ri_ctrl = NULL;
1704 cf_mutex *vlock;
1705
1706 if (cf_shash_get_vlock(emig->reinsert_hash, &insert_id, (void **)&ri_ctrl,
1707 &vlock) == CF_SHASH_OK) {
1708 if (src == emig->dest) {
1709 if (cf_atomic32_sub(&emig->bytes_emigrating,
1710 (int32_t)msg_get_wire_size(ri_ctrl->m)) < 0) {
1711 cf_warning(AS_MIGRATE, "bytes_emigrating less than zero");
1712 }
1713
1714 as_fabric_msg_put(ri_ctrl->m);
1715 // At this point, the rt is *GONE*.
1716 cf_shash_delete_lockfree(emig->reinsert_hash, &insert_id);
1717 ri_ctrl = NULL;
1718 }
1719 else {
1720 cf_warning(AS_MIGRATE, "insert ack: unexpected source %lx", src);
1721 }
1722
1723 cf_mutex_unlock(vlock);
1724 }
1725
1726 emigration_release(emig);
1727 as_fabric_msg_put(m);
1728}
1729
1730
1731void
1732emigration_handle_ctrl_ack(cf_node src, msg *m, uint32_t op)
1733{
1734 uint32_t emig_id;
1735
1736 if (msg_get_uint32(m, MIG_FIELD_EMIG_ID, &emig_id) != 0) {
1737 cf_warning(AS_MIGRATE, "ctrl ack: msg get for emig id failed");
1738 as_fabric_msg_put(m);
1739 return;
1740 }
1741
1742 uint32_t immig_features = 0;
1743
1744 msg_get_uint32(m, MIG_FIELD_FEATURES, &immig_features);
1745
1746 as_fabric_msg_put(m);
1747
1748 emigration *emig;
1749
1750 if (cf_rchash_get(g_emigration_hash, (void *)&emig_id, sizeof(emig_id),
1751 (void **)&emig) == CF_RCHASH_OK) {
1752 if (emig->dest == src) {
1753 if ((immig_features & MIG_FEATURE_MERGE) == 0) {
1754 // TODO - rethink where this should go after further refactor.
1755 if (op == OPERATION_START_ACK_OK && emig->meta_q) {
1756 meta_in_q_rejected(emig->meta_q);
1757 }
1758 }
1759
1760 cf_queue_push(emig->ctrl_q, &op);
1761 }
1762 else {
1763 cf_warning(AS_MIGRATE, "ctrl ack (%d): unexpected source %lx", op,
1764 src);
1765 }
1766
1767 emigration_release(emig);
1768 }
1769 else {
1770 cf_detail(AS_MIGRATE, "ctrl ack (%d): can't find emig id %u", op,
1771 emig_id);
1772 }
1773}
1774
1775
1776//==========================================================
1777// Local helpers - info API helpers.
1778//
1779
1780int
1781emigration_dump_reduce_fn(const void *key, uint32_t keylen, void *object,
1782 void *udata)
1783{
1784 uint32_t emig_id = *(const uint32_t *)key;
1785 emigration *emig = (emigration *)object;
1786 int *item_num = (int *)udata;
1787
1788 cf_info(AS_MIGRATE, "[%d]: mig_id %u : id %u ; ck %lx", *item_num, emig_id,
1789 emig->id, emig->cluster_key);
1790
1791 *item_num += 1;
1792
1793 return 0;
1794}
1795
1796
1797int
1798immigration_dump_reduce_fn(const void *key, uint32_t keylen, void *object,
1799 void *udata)
1800{
1801 const immigration_hkey *hkey = (const immigration_hkey *)key;
1802 immigration *immig = (immigration *)object;
1803 int *item_num = (int *)udata;
1804
1805 cf_info(AS_MIGRATE, "[%d]: src %016lx ; id %u : src %016lx ; done recv %u ; start recv ms %lu ; done recv ms %lu ; ck %lx",
1806 *item_num, hkey->src, hkey->emig_id, immig->src, immig->done_recv,
1807 immig->start_recv_ms, immig->done_recv_ms, immig->cluster_key);
1808
1809 *item_num += 1;
1810
1811 return 0;
1812}
1813