1/*
2 * partition_balance.c
3 *
4 * Copyright (C) 2016-2019 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "fabric/partition_balance.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <stdlib.h>
33#include <string.h>
34
35#include "citrusleaf/alloc.h"
36#include "citrusleaf/cf_atomic.h"
37#include "citrusleaf/cf_hash_math.h"
38#include "citrusleaf/cf_queue.h"
39
40#include "cf_mutex.h"
41#include "fault.h"
42#include "node.h"
43
44#include "base/cfg.h"
45#include "base/datamodel.h"
46#include "base/index.h"
47#include "fabric/exchange.h"
48#include "fabric/hb.h"
49#include "fabric/migrate.h"
50#include "fabric/partition.h"
51#include "storage/storage.h"
52
53
54//==========================================================
55// Typedefs & constants.
56//
57
58const as_partition_version ZERO_VERSION = { 0 };
59
60
61//==========================================================
62// Globals.
63//
64
65cf_atomic32 g_partition_generation = (uint32_t)-1;
66uint64_t g_rebalance_sec;
67uint64_t g_rebalance_generation = 0;
68
69// Using int for 4-byte size, but maintaining bool semantics.
70// TODO - ok as non-volatile, but should selectively load/store in the future.
71static int g_init_balance_done = false;
72
73static cf_atomic32 g_migrate_num_incoming = 0;
74
75// Using int for 4-byte size, but maintaining bool semantics.
76volatile int g_allow_migrations = false;
77
78uint64_t g_hashed_pids[AS_PARTITIONS];
79
80// Shortcuts to values set by as_exchange, for use in partition balance only.
81uint32_t g_cluster_size = 0;
82cf_node* g_succession = NULL;
83
84cf_node g_full_node_seq_table[AS_CLUSTER_SZ * AS_PARTITIONS];
85sl_ix_t g_full_sl_ix_table[AS_CLUSTER_SZ * AS_PARTITIONS];
86
87
88//==========================================================
89// Forward declarations.
90//
91
92// Only partition_balance hooks into exchange.
93extern cf_node* as_exchange_succession_unsafe();
94
95// Helpers - generic.
96void create_trees(as_partition* p, as_namespace* ns);
97void drop_trees(as_partition* p);
98
99// Helpers - balance partitions.
100void fill_global_tables();
101void apply_single_replica_limit_ap(as_namespace* ns);
102int find_working_master_ap(const as_partition* p, const sl_ix_t* ns_sl_ix, const as_namespace* ns);
103uint32_t find_duplicates_ap(const as_partition* p, const cf_node* ns_node_seq, const sl_ix_t* ns_sl_ix, const struct as_namespace_s* ns, uint32_t working_master_n, cf_node dupls[]);
104void advance_version_ap(as_partition* p, const sl_ix_t* ns_sl_ix, as_namespace* ns, uint32_t self_n, uint32_t working_master_n, uint32_t n_dupl, const cf_node dupls[]);
105uint32_t fill_family_versions(const as_partition* p, const sl_ix_t* ns_sl_ix, const as_namespace* ns, uint32_t working_master_n, uint32_t n_dupl, const cf_node dupls[], as_partition_version family_versions[]);
106bool has_replica_parent(const as_partition* p, const sl_ix_t* ns_sl_ix, const as_namespace* ns, const as_partition_version* subset_version, uint32_t subset_n);
107uint32_t find_family(const as_partition_version* self_version, uint32_t n_families, const as_partition_version family_versions[]);
108
109// Helpers - migration-related.
110bool partition_immigration_is_valid(const as_partition* p, cf_node source_node, const as_namespace* ns, const char* tag);
111
112
113//==========================================================
114// Inlines & macros.
115//
116
117static inline bool
118is_self_final_master(const as_partition* p)
119{
120 return p->replicas[0] == g_config.self_node;
121}
122
123static inline bool
124is_family_same(const as_partition_version* v1, const as_partition_version* v2)
125{
126 return v1->ckey == v2->ckey && v1->family == v2->family &&
127 v1->family != VERSION_FAMILY_UNIQUE;
128}
129
130
131//==========================================================
132// Public API - regulate migrations.
133//
134
135void
136as_partition_balance_disallow_migrations()
137{
138 cf_detail(AS_PARTITION, "disallow migrations");
139
140 g_allow_migrations = false;
141}
142
143bool
144as_partition_balance_are_migrations_allowed()
145{
146 return g_allow_migrations;
147}
148
149void
150as_partition_balance_synchronize_migrations()
151{
152 // Acquire and release each partition lock to ensure threads acquiring a
153 // partition lock after this will be forced to check the latest cluster key.
154 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
155 as_namespace* ns = g_config.namespaces[ns_ix];
156
157 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
158 as_partition* p = &ns->partitions[pid];
159
160 cf_mutex_lock(&p->lock);
161 cf_mutex_unlock(&p->lock);
162 }
163 }
164
165 // Prior-round migrations won't decrement g_migrate_num_incoming due to
166 // cluster key check.
167 cf_atomic32_set(&g_migrate_num_incoming, 0);
168}
169
170
171//==========================================================
172// Public API - balance partitions.
173//
174
175void
176as_partition_balance_init()
177{
178 // Cache hashed pids for all future rebalances.
179 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
180 g_hashed_pids[pid] = cf_hash_fnv64((const uint8_t*)&pid,
181 sizeof(uint32_t));
182 }
183
184 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
185 as_namespace* ns = g_config.namespaces[ns_ix];
186
187 uint32_t n_stored = 0;
188
189 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
190 as_partition* p = &ns->partitions[pid];
191
192 as_storage_load_pmeta(ns, p);
193
194 if (as_partition_version_has_data(&p->version)) {
195 as_partition_isolate_version(ns, p);
196 n_stored++;
197 }
198 }
199
200 cf_info(AS_PARTITION, "{%s} %u partitions: found %u absent, %u stored",
201 ns->name, AS_PARTITIONS, AS_PARTITIONS - n_stored, n_stored);
202 }
203
204 partition_balance_init();
205}
206
207// Has the node resolved as operating either in a multi-node cluster or as a
208// single-node cluster?
209bool
210as_partition_balance_is_init_resolved()
211{
212 return g_init_balance_done;
213}
214
215void
216as_partition_balance_revert_to_orphan()
217{
218 g_init_balance_done = false;
219 g_allow_migrations = false;
220
221 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
222 as_namespace* ns = g_config.namespaces[ns_ix];
223
224 client_replica_maps_clear(ns);
225
226 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
227 as_partition* p = &ns->partitions[pid];
228
229 cf_mutex_lock(&p->lock);
230
231 as_partition_freeze(p);
232 as_partition_isolate_version(ns, p);
233
234 cf_mutex_unlock(&p->lock);
235 }
236
237 ns->n_unavailable_partitions = AS_PARTITIONS;
238 }
239
240 cf_atomic32_incr(&g_partition_generation);
241}
242
243void
244as_partition_balance()
245{
246 // Temporary paranoia.
247 static uint64_t last_cluster_key = 0;
248
249 if (last_cluster_key == as_exchange_cluster_key()) {
250 cf_warning(AS_PARTITION, "as_partition_balance: cluster key %lx same as last time",
251 last_cluster_key);
252 return;
253 }
254
255 last_cluster_key = as_exchange_cluster_key();
256 // End - temporary paranoia.
257
258 // These shortcuts must only be used within the scope of this function.
259 g_cluster_size = as_exchange_cluster_size();
260 g_succession = as_exchange_succession_unsafe();
261
262 // Each partition separately shuffles the node succession list to generate
263 // its own node sequence.
264 fill_global_tables();
265
266 cf_queue mq;
267
268 cf_queue_init(&mq, sizeof(pb_task), g_config.n_namespaces * AS_PARTITIONS,
269 false);
270
271 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
272 balance_namespace(g_config.namespaces[ns_ix], &mq);
273 }
274
275 prepare_for_appeals();
276
277 // All partitions now have replicas assigned, ok to allow transactions.
278 g_init_balance_done = true;
279 cf_atomic32_incr(&g_partition_generation);
280
281 g_allow_migrations = true;
282 cf_detail(AS_PARTITION, "allow migrations");
283
284 g_rebalance_sec = cf_get_seconds(); // must precede process_pb_tasks()
285
286 process_pb_tasks(&mq);
287 cf_queue_destroy(&mq);
288
289 g_rebalance_generation++;
290}
291
292uint64_t
293as_partition_balance_remaining_migrations()
294{
295 uint64_t remaining_migrations = 0;
296
297 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
298 as_namespace* ns = g_config.namespaces[ns_ix];
299
300 remaining_migrations += ns->migrate_tx_partitions_remaining;
301 remaining_migrations += ns->migrate_rx_partitions_remaining;
302 }
303
304 return remaining_migrations;
305}
306
307
308//==========================================================
309// Public API - migration-related as_partition methods.
310//
311
312// Currently used only for enterprise build.
313bool
314as_partition_pending_migrations(as_partition* p)
315{
316 cf_mutex_lock(&p->lock);
317
318 bool pending = p->pending_immigrations + p->pending_emigrations != 0;
319
320 cf_mutex_unlock(&p->lock);
321
322 return pending;
323}
324
325void
326as_partition_emigrate_done(as_namespace* ns, uint32_t pid,
327 uint64_t orig_cluster_key, cf_node dest_node, uint32_t tx_flags)
328{
329 as_partition* p = &ns->partitions[pid];
330
331 cf_mutex_lock(&p->lock);
332
333 if (! g_allow_migrations || orig_cluster_key != as_exchange_cluster_key()) {
334 cf_debug(AS_PARTITION, "{%s:%u} emigrate_done - cluster key mismatch",
335 ns->name, pid);
336 cf_mutex_unlock(&p->lock);
337 return;
338 }
339
340 if (p->pending_emigrations == 0) {
341 cf_warning(AS_PARTITION, "{%s:%u} emigrate_done - no pending emigrations",
342 ns->name, pid);
343 cf_mutex_unlock(&p->lock);
344 return;
345 }
346
347 p->pending_emigrations--;
348
349 int64_t migrates_tx_remaining =
350 cf_atomic_int_decr(&ns->migrate_tx_partitions_remaining);
351
352 if (migrates_tx_remaining < 0){
353 cf_warning(AS_PARTITION, "{%s:%u} (%hu,%ld) emigrate_done - counter went negative",
354 ns->name, pid, p->pending_emigrations, migrates_tx_remaining);
355 }
356
357 if ((tx_flags & TX_FLAGS_LEAD) != 0) {
358 p->pending_lead_emigrations--;
359 cf_atomic_int_decr(&ns->migrate_tx_partitions_lead_remaining);
360 }
361
362 if (! is_self_final_master(p)) {
363 emigrate_done_advance_non_master_version(ns, p, tx_flags);
364 }
365
366 int dest_ix = index_of_node(p->replicas, p->n_replicas, dest_node);
367
368 cf_assert(dest_ix != -1, AS_PARTITION, "non-replica dest node");
369
370 p->immigrators[dest_ix] = false;
371
372 if (client_replica_maps_update(ns, pid)) {
373 cf_atomic32_incr(&g_partition_generation);
374 }
375
376 cf_queue mq;
377 pb_task task;
378 int w_ix = -1;
379
380 if (is_self_final_master(p) &&
381 p->pending_emigrations == 0 && p->pending_immigrations == 0) {
382 cf_queue_init(&mq, sizeof(pb_task), p->n_witnesses, false);
383
384 for (w_ix = 0; w_ix < (int)p->n_witnesses; w_ix++) {
385 pb_task_init(&task, p->witnesses[w_ix], ns, pid, orig_cluster_key,
386 PB_TASK_EMIG_SIGNAL_ALL_DONE, TX_FLAGS_CONTINGENT);
387 cf_queue_push(&mq, &task);
388 }
389 }
390
391 cf_mutex_unlock(&p->lock);
392
393 if (w_ix >= 0) {
394 while (cf_queue_pop(&mq, &task, CF_QUEUE_NOWAIT) == CF_QUEUE_OK) {
395 as_migrate_emigrate(&task);
396 }
397
398 cf_queue_destroy(&mq);
399 }
400}
401
402as_migrate_result
403as_partition_immigrate_start(as_namespace* ns, uint32_t pid,
404 uint64_t orig_cluster_key, cf_node source_node)
405{
406 as_partition* p = &ns->partitions[pid];
407
408 cf_mutex_lock(&p->lock);
409
410 if (! g_allow_migrations || orig_cluster_key != as_exchange_cluster_key() ||
411 immigrate_yield()) {
412 cf_debug(AS_PARTITION, "{%s:%u} immigrate_start - cluster key mismatch",
413 ns->name, pid);
414 cf_mutex_unlock(&p->lock);
415 return AS_MIGRATE_AGAIN;
416 }
417
418 uint32_t num_incoming = (uint32_t)cf_atomic32_incr(&g_migrate_num_incoming);
419
420 if (num_incoming > g_config.migrate_max_num_incoming) {
421 cf_debug(AS_PARTITION, "{%s:%u} immigrate_start - exceeded max_num_incoming",
422 ns->name, pid);
423 cf_atomic32_decr(&g_migrate_num_incoming);
424 cf_mutex_unlock(&p->lock);
425 return AS_MIGRATE_AGAIN;
426 }
427
428 if (! partition_immigration_is_valid(p, source_node, ns, "start")) {
429 cf_atomic32_decr(&g_migrate_num_incoming);
430 cf_mutex_unlock(&p->lock);
431 return AS_MIGRATE_FAIL;
432 }
433
434 if (! is_self_final_master(p)) {
435 immigrate_start_advance_non_master_version(ns, p);
436 as_storage_save_pmeta(ns, p);
437 }
438
439 cf_mutex_unlock(&p->lock);
440
441 return AS_MIGRATE_OK;
442}
443
444as_migrate_result
445as_partition_immigrate_done(as_namespace* ns, uint32_t pid,
446 uint64_t orig_cluster_key, cf_node source_node)
447{
448 as_partition* p = &ns->partitions[pid];
449
450 cf_mutex_lock(&p->lock);
451
452 if (! g_allow_migrations || orig_cluster_key != as_exchange_cluster_key()) {
453 cf_debug(AS_PARTITION, "{%s:%u} immigrate_done - cluster key mismatch",
454 ns->name, pid);
455 cf_mutex_unlock(&p->lock);
456 return AS_MIGRATE_FAIL;
457 }
458
459 cf_atomic32_decr(&g_migrate_num_incoming);
460
461 if (! partition_immigration_is_valid(p, source_node, ns, "done")) {
462 cf_mutex_unlock(&p->lock);
463 return AS_MIGRATE_FAIL;
464 }
465
466 p->pending_immigrations--;
467
468 int64_t migrates_rx_remaining =
469 cf_atomic_int_decr(&ns->migrate_rx_partitions_remaining);
470
471 // Sanity-check only.
472 if (migrates_rx_remaining < 0) {
473 cf_warning(AS_PARTITION, "{%s:%u} (%hu,%ld) immigrate_done - counter went negative",
474 ns->name, pid, p->pending_immigrations, migrates_rx_remaining);
475 }
476
477 if (p->pending_immigrations == 0 &&
478 ! as_partition_version_same(&p->version, &p->final_version)) {
479 p->version = p->final_version;
480 as_storage_save_pmeta(ns, p);
481 }
482
483 if (! is_self_final_master(p)) {
484 if (client_replica_maps_update(ns, pid)) {
485 cf_atomic32_incr(&g_partition_generation);
486 }
487
488 cf_mutex_unlock(&p->lock);
489 return AS_MIGRATE_OK;
490 }
491
492 // Final master finished an immigration, adjust duplicates.
493
494 if (source_node == p->working_master) {
495 p->working_master = g_config.self_node;
496
497 immigrate_done_advance_final_master_version(ns, p);
498 }
499 else {
500 p->n_dupl = remove_node(p->dupls, p->n_dupl, source_node);
501 }
502
503 if (client_replica_maps_update(ns, pid)) {
504 cf_atomic32_incr(&g_partition_generation);
505 }
506
507 if (p->pending_immigrations != 0) {
508 cf_mutex_unlock(&p->lock);
509 return AS_MIGRATE_OK;
510 }
511
512 // Final master finished all immigration.
513
514 cf_queue mq;
515 pb_task task;
516
517 if (p->pending_emigrations != 0) {
518 cf_queue_init(&mq, sizeof(pb_task), p->n_replicas - 1, false);
519
520 for (uint32_t repl_ix = 1; repl_ix < p->n_replicas; repl_ix++) {
521 if (p->immigrators[repl_ix]) {
522 pb_task_init(&task, p->replicas[repl_ix], ns, pid,
523 orig_cluster_key, PB_TASK_EMIG_TRANSFER,
524 TX_FLAGS_CONTINGENT);
525 cf_queue_push(&mq, &task);
526 }
527 }
528 }
529 else {
530 cf_queue_init(&mq, sizeof(pb_task), p->n_witnesses, false);
531
532 for (uint16_t w_ix = 0; w_ix < p->n_witnesses; w_ix++) {
533 pb_task_init(&task, p->witnesses[w_ix], ns, pid, orig_cluster_key,
534 PB_TASK_EMIG_SIGNAL_ALL_DONE, TX_FLAGS_CONTINGENT);
535 cf_queue_push(&mq, &task);
536 }
537 }
538
539 cf_mutex_unlock(&p->lock);
540
541 while (cf_queue_pop(&mq, &task, 0) == CF_QUEUE_OK) {
542 as_migrate_emigrate(&task);
543 }
544
545 cf_queue_destroy(&mq);
546
547 return AS_MIGRATE_OK;
548}
549
550as_migrate_result
551as_partition_migrations_all_done(as_namespace* ns, uint32_t pid,
552 uint64_t orig_cluster_key)
553{
554 as_partition* p = &ns->partitions[pid];
555
556 cf_mutex_lock(&p->lock);
557
558 if (! g_allow_migrations || orig_cluster_key != as_exchange_cluster_key()) {
559 cf_debug(AS_PARTITION, "{%s:%u} all_done - cluster key mismatch",
560 ns->name, pid);
561 cf_mutex_unlock(&p->lock);
562 return AS_MIGRATE_FAIL;
563 }
564
565 if (p->pending_emigrations != 0) {
566 cf_debug(AS_PARTITION, "{%s:%u} all_done - eagain",
567 ns->name, pid);
568 cf_mutex_unlock(&p->lock);
569 return AS_MIGRATE_AGAIN;
570 }
571
572 // Not a replica and non-null version ...
573 if (! is_self_replica(p) && ! as_partition_version_is_null(&p->version)) {
574 // ... and not quiesced - drop partition.
575 if (drop_superfluous_version(p, ns)) {
576 drop_trees(p);
577 as_storage_save_pmeta(ns, p);
578 }
579 // ... or quiesced more than one node - become subset of final version.
580 else if (adjust_superfluous_version(p, ns)) {
581 as_storage_save_pmeta(ns, p);
582 }
583 }
584
585 cf_mutex_unlock(&p->lock);
586
587 return AS_MIGRATE_OK;
588}
589
590void
591as_partition_signal_done(as_namespace* ns, uint32_t pid,
592 uint64_t orig_cluster_key)
593{
594 as_partition* p = &ns->partitions[pid];
595
596 cf_mutex_lock(&p->lock);
597
598 if (! g_allow_migrations || orig_cluster_key != as_exchange_cluster_key()) {
599 cf_debug(AS_PARTITION, "{%s:%u} signal_done - cluster key mismatch",
600 ns->name, pid);
601 cf_mutex_unlock(&p->lock);
602 return;
603 }
604
605 cf_atomic_int_decr(&ns->migrate_signals_remaining);
606
607 cf_mutex_unlock(&p->lock);
608}
609
610
611//==========================================================
612// Local helpers - generic.
613//
614
615void
616pb_task_init(pb_task* task, cf_node dest, as_namespace* ns,
617 uint32_t pid, uint64_t cluster_key, pb_task_type type,
618 uint32_t tx_flags)
619{
620 task->dest = dest;
621 task->ns = ns;
622 task->pid = pid;
623 task->type = type;
624 task->tx_flags = tx_flags;
625 task->cluster_key = cluster_key;
626}
627
628void
629create_trees(as_partition* p, as_namespace* ns)
630{
631 cf_assert(! p->tree, AS_PARTITION, "unexpected - tree already exists");
632
633 as_partition_advance_tree_id(p, ns->name);
634
635 p->tree = as_index_tree_create(&ns->tree_shared, p->tree_id,
636 as_partition_tree_done, (void*)p);
637}
638
639void
640drop_trees(as_partition* p)
641{
642 if (! p->tree) {
643 return; // CP signals can get here - 0e/0r versions are witnesses
644 }
645
646 as_index_tree_release(p->tree);
647 p->tree = NULL;
648
649 // TODO - consider p->n_tombstones?
650 cf_atomic32_set(&p->max_void_time, 0);
651}
652
653
654//==========================================================
655// Local helpers - balance partitions.
656//
657
658// Succession list - all nodes in cluster
659// +---------------+
660// | A | B | C | D |
661// +---------------+
662//
663// Succession list index (sl_ix) - used as version table and rack-id index
664// +---------------+
665// | 0 | 1 | 2 | 3 |
666// +---------------+
667//
668// Every partition shuffles the succession list independently, e.g. for pid 0:
669// Hash the node names with the pid:
670// H(A,0) = Y, H(B,0) = X, H(C,0) = W, H(D,0) = Z
671// Store sl_ix in last byte of hash results so it doesn't affect sort:
672// +-----------------------+
673// | Y_0 | X_1 | W_2 | Z_3 |
674// +-----------------------+
675// This sorts to:
676// +-----------------------+
677// | W_2 | X_1 | Y_0 | Z_3 |
678// +-----------------------+
679// Replace original node names, and keep sl_ix order, resulting in:
680// +---------------+ +---------------+
681// | C | B | A | D | | 2 | 1 | 0 | 3 |
682// +---------------+ +---------------+
683//
684// Node sequence table Succession list index table
685// pid pid
686// +===+---------------+ +===+---------------+
687// | 0 | C | B | A | D | | 0 | 2 | 1 | 0 | 3 |
688// +===+---------------+ +===+---------------+
689// | 1 | A | D | C | B | | 1 | 0 | 3 | 2 | 1 |
690// +===+---------------+ +===+---------------+
691// | 2 | D | C | B | A | | 2 | 3 | 2 | 1 | 0 |
692// +===+---------------+ +===+---------------+
693// | 3 | B | A | D | C | | 3 | 1 | 0 | 3 | 2 |
694// +===+---------------+ +===+---------------+
695// | 4 | D | B | C | A | | 4 | 3 | 1 | 2 | 0 |
696// +===+---------------+ +===+---------------+
697// ... to pid 4095.
698//
699// We keep the succession list index table so we can refer back to namespaces'
700// partition version tables and rack-id lists, where nodes are in the original
701// succession list order.
702void
703fill_global_tables()
704{
705 uint64_t hashed_nodes[g_cluster_size];
706
707 for (uint32_t n = 0; n < g_cluster_size; n++) {
708 hashed_nodes[n] = cf_hash_fnv64((const uint8_t*)&g_succession[n],
709 sizeof(cf_node));
710 }
711
712 // Build the node sequence table.
713 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
714 inter_hash h;
715
716 h.hashed_pid = g_hashed_pids[pid];
717
718 for (uint32_t n = 0; n < g_cluster_size; n++) {
719 h.hashed_node = hashed_nodes[n];
720
721 cf_node* node_p = &FULL_NODE_SEQ(pid, n);
722
723 *node_p = cf_hash_jen64((const uint8_t*)&h, sizeof(h));
724
725 // Overlay index onto last byte.
726 *node_p &= AS_CLUSTER_SZ_MASKP;
727 *node_p += n;
728 }
729
730 // Sort the hashed node values.
731 qsort(&FULL_NODE_SEQ(pid, 0), g_cluster_size, sizeof(cf_node),
732 cf_node_compare_desc);
733
734 // Overwrite the sorted hash values with the original node IDs.
735 for (uint32_t n = 0; n < g_cluster_size; n++) {
736 cf_node* node_p = &FULL_NODE_SEQ(pid, n);
737 sl_ix_t sl_ix = (sl_ix_t)(*node_p & AS_CLUSTER_SZ_MASKN);
738
739 *node_p = g_succession[sl_ix];
740
741 // Saved to refer back to partition version table and rack-id list.
742 FULL_SL_IX(pid, n) = sl_ix;
743 }
744 }
745}
746
747void
748balance_namespace_ap(as_namespace* ns, cf_queue* mq)
749{
750 bool ns_less_than_global = ns->cluster_size != g_cluster_size;
751
752 if (ns_less_than_global) {
753 cf_info(AS_PARTITION, "{%s} is on %u of %u nodes", ns->name,
754 ns->cluster_size, g_cluster_size);
755 }
756
757 // Figure out effective replication factor in the face of node failures.
758 apply_single_replica_limit_ap(ns);
759
760 // Active size will be less than cluster size if nodes are quiesced.
761 set_active_size(ns);
762
763 uint32_t n_racks = rack_count(ns);
764
765 // If a namespace is not on all nodes or is rack aware or uniform balance
766 // is preferred or nodes are quiesced, it can't use the global node sequence
767 // and index tables.
768 bool ns_not_equal_global = ns_less_than_global || n_racks != 1 ||
769 ns->prefer_uniform_balance || ns->active_size != ns->cluster_size;
770
771 // The translation array is used to convert global table rows to namespace
772 // rows, if necessary.
773 int translation[ns_less_than_global ? g_cluster_size : 0];
774
775 if (ns_less_than_global) {
776 fill_translation(translation, ns);
777 }
778
779 uint32_t claims_size = ns->prefer_uniform_balance ?
780 ns->replication_factor * g_cluster_size : 0;
781 uint32_t claims[claims_size];
782 uint32_t target_claims[claims_size];
783
784 if (ns->prefer_uniform_balance) {
785 memset(claims, 0, sizeof(claims));
786 init_target_claims_ap(ns, translation, target_claims);
787 }
788
789 uint32_t ns_pending_emigrations = 0;
790 uint32_t ns_pending_lead_emigrations = 0;
791 uint32_t ns_pending_immigrations = 0;
792 uint32_t ns_pending_signals = 0;
793
794 uint32_t ns_fresh_partitions = 0;
795
796 for (uint32_t pid_group = 0; pid_group < NUM_PID_GROUPS; pid_group++) {
797 uint32_t start_pid = pid_group * PIDS_PER_GROUP;
798 uint32_t end_pid = start_pid + PIDS_PER_GROUP;
799
800 for (uint32_t pid = start_pid; pid < end_pid; pid++) {
801 as_partition* p = &ns->partitions[pid];
802
803 cf_node* full_node_seq = &FULL_NODE_SEQ(pid, 0);
804 sl_ix_t* full_sl_ix = &FULL_SL_IX(pid, 0);
805
806 // Usually a namespace can simply use the global tables...
807 cf_node* ns_node_seq = full_node_seq;
808 sl_ix_t* ns_sl_ix = full_sl_ix;
809
810 cf_node stack_node_seq[ns_not_equal_global ? ns->cluster_size : 0];
811 sl_ix_t stack_sl_ix[ns_not_equal_global ? ns->cluster_size : 0];
812
813 // ... but sometimes a namespace is different.
814 if (ns_not_equal_global) {
815 ns_node_seq = stack_node_seq;
816 ns_sl_ix = stack_sl_ix;
817
818 fill_namespace_rows(full_node_seq, full_sl_ix, ns_node_seq,
819 ns_sl_ix, ns, translation);
820
821 if (ns->active_size != ns->cluster_size) {
822 quiesce_adjust_row(ns_node_seq, ns_sl_ix, ns);
823 }
824
825 if (ns->prefer_uniform_balance) {
826 uniform_adjust_row(ns_node_seq, ns->active_size, ns_sl_ix,
827 ns->replication_factor, claims, target_claims,
828 ns->rack_ids, n_racks);
829 }
830 else if (n_racks != 1) {
831 rack_aware_adjust_row(ns_node_seq, ns_sl_ix,
832 ns->replication_factor, ns->rack_ids,
833 ns->active_size, n_racks, 1);
834 }
835 }
836
837 cf_mutex_lock(&p->lock);
838
839 p->working_master = (cf_node)0;
840
841 p->n_replicas = ns->replication_factor;
842 memcpy(p->replicas, ns_node_seq, p->n_replicas * sizeof(cf_node));
843
844 p->n_dupl = 0;
845
846 p->pending_emigrations = 0;
847 p->pending_lead_emigrations = 0;
848 p->pending_immigrations = 0;
849
850 memset(p->immigrators, 0, ns->replication_factor * sizeof(bool));
851
852 p->n_witnesses = 0;
853
854 uint32_t self_n = find_self(ns_node_seq, ns);
855
856 as_partition_version final_version = {
857 .ckey = as_exchange_cluster_key(),
858 .master = self_n == 0 ? 1 : 0
859 };
860
861 p->final_version = final_version;
862
863 int working_master_n = find_working_master_ap(p, ns_sl_ix, ns);
864
865 uint32_t n_dupl = 0;
866 cf_node dupls[ns->cluster_size];
867
868 as_partition_version orig_version = p->version;
869
870 // TEMPORARY debugging.
871 uint32_t debug_n_immigrators = 0;
872
873 if (working_master_n == -1) {
874 // No existing versions - assign fresh version to replicas.
875 working_master_n = 0;
876
877 if (self_n < p->n_replicas) {
878 p->version = p->final_version;
879 }
880
881 ns_fresh_partitions++;
882 }
883 else {
884 n_dupl = find_duplicates_ap(p, ns_node_seq, ns_sl_ix, ns,
885 (uint32_t)working_master_n, dupls);
886
887 uint32_t n_immigrators = fill_immigrators(p, ns_sl_ix, ns,
888 (uint32_t)working_master_n, n_dupl);
889
890 // TEMPORARY debugging.
891 debug_n_immigrators = n_immigrators;
892
893 if (n_immigrators != 0) {
894 // Migrations required - advance versions for next
895 // rebalance, queue migrations for this rebalance.
896
897 advance_version_ap(p, ns_sl_ix, ns, self_n,
898 (uint32_t)working_master_n, n_dupl, dupls);
899
900 uint32_t lead_flags[ns->replication_factor];
901
902 emig_lead_flags_ap(p, ns_sl_ix, ns, lead_flags);
903
904 queue_namespace_migrations(p, ns, self_n,
905 ns_node_seq[working_master_n], n_dupl, dupls,
906 lead_flags, mq);
907
908 if (self_n == 0) {
909 fill_witnesses(p, ns_node_seq, ns_sl_ix, ns);
910 ns_pending_signals += p->n_witnesses;
911 }
912 }
913 else if (self_n < p->n_replicas) {
914 // No migrations required - refresh replicas' versions (only
915 // truly necessary if replication factor decreased).
916 p->version = p->final_version;
917 }
918 else {
919 // No migrations required - drop superfluous non-replica
920 // partitions immediately.
921 if (! drop_superfluous_version(p, ns)) {
922 // Quiesced nodes become subset of final version.
923 adjust_superfluous_version(p, ns);
924 }
925 }
926 }
927
928 if (self_n == 0 || self_n == working_master_n) {
929 p->working_master = ns_node_seq[working_master_n];
930 }
931
932 handle_version_change(p, ns, &orig_version);
933
934 ns_pending_emigrations += p->pending_emigrations;
935 ns_pending_lead_emigrations += p->pending_lead_emigrations;
936 ns_pending_immigrations += p->pending_immigrations;
937
938 // TEMPORARY debugging.
939 if (pid < 20) {
940 cf_debug(AS_PARTITION, "ck%012lX %02u (%hu %hu) %s -> %s - self_n %u wm_n %d repls %u dupls %u immigrators %u",
941 as_exchange_cluster_key(), pid, p->pending_emigrations,
942 p->pending_immigrations,
943 VERSION_AS_STRING(&orig_version),
944 VERSION_AS_STRING(&p->version), self_n,
945 working_master_n, p->n_replicas, n_dupl,
946 debug_n_immigrators);
947 }
948
949 client_replica_maps_update(ns, pid);
950 }
951
952 // Flush partition metadata for this group of partitions ...
953 as_storage_flush_pmeta(ns, start_pid, PIDS_PER_GROUP);
954
955 // ... and unlock the group.
956 for (uint32_t pid = start_pid; pid < end_pid; pid++) {
957 as_partition* p = &ns->partitions[pid];
958
959 cf_mutex_unlock(&p->lock);
960 }
961 }
962
963 cf_info(AS_PARTITION, "{%s} rebalanced: expected-migrations (%u,%u,%u) fresh-partitions %u",
964 ns->name, ns_pending_emigrations, ns_pending_immigrations,
965 ns_pending_signals, ns_fresh_partitions);
966
967 ns->n_unavailable_partitions = 0;
968
969 ns->migrate_tx_partitions_initial = ns_pending_emigrations;
970 ns->migrate_tx_partitions_remaining = ns_pending_emigrations;
971 ns->migrate_tx_partitions_lead_remaining = ns_pending_lead_emigrations;
972
973 ns->migrate_rx_partitions_initial = ns_pending_immigrations;
974 ns->migrate_rx_partitions_remaining = ns_pending_immigrations;
975
976 ns->migrate_signals_remaining = ns_pending_signals;
977}
978
979void
980apply_single_replica_limit_ap(as_namespace* ns)
981{
982 // Replication factor can't be bigger than observed cluster.
983 uint32_t repl_factor = ns->cluster_size < ns->cfg_replication_factor ?
984 ns->cluster_size : ns->cfg_replication_factor;
985
986 // Reduce the replication factor to 1 if the cluster size is less than or
987 // equal to the specified limit.
988 ns->replication_factor =
989 ns->cluster_size <= g_config.paxos_single_replica_limit ?
990 1 : repl_factor;
991
992 cf_info(AS_PARTITION, "{%s} replication factor is %u", ns->name,
993 ns->replication_factor);
994}
995
996void
997fill_translation(int translation[], const as_namespace* ns)
998{
999 int ns_n = 0;
1000
1001 for (uint32_t full_n = 0; full_n < g_cluster_size; full_n++) {
1002 translation[full_n] = ns_n < ns->cluster_size &&
1003 g_succession[full_n] == ns->succession[ns_n] ? ns_n++ : -1;
1004 }
1005}
1006
1007void
1008fill_namespace_rows(const cf_node* full_node_seq, const sl_ix_t* full_sl_ix,
1009 cf_node* ns_node_seq, sl_ix_t* ns_sl_ix, const as_namespace* ns,
1010 const int translation[])
1011{
1012 if (ns->cluster_size == g_cluster_size) {
1013 // Rack-aware but namespace is on all nodes - just copy. Rack-aware will
1014 // rearrange the copies - we can't rearrange the global originals.
1015 memcpy(ns_node_seq, full_node_seq, g_cluster_size * sizeof(cf_node));
1016 memcpy(ns_sl_ix, full_sl_ix, g_cluster_size * sizeof(sl_ix_t));
1017
1018 return;
1019 }
1020
1021 // Fill namespace sequences from global table rows using translation array.
1022 uint32_t n = 0;
1023
1024 for (uint32_t full_n = 0; full_n < g_cluster_size; full_n++) {
1025 int ns_n = translation[full_sl_ix[full_n]];
1026
1027 if (ns_n != -1) {
1028 ns_node_seq[n] = ns->succession[ns_n];
1029 ns_sl_ix[n] = (sl_ix_t)ns_n;
1030 n++;
1031 }
1032 }
1033}
1034
1035uint32_t
1036find_self(const cf_node* ns_node_seq, const as_namespace* ns)
1037{
1038 int n = index_of_node(ns_node_seq, ns->cluster_size, g_config.self_node);
1039
1040 cf_assert(n != -1, AS_PARTITION, "{%s} self node not in succession list",
1041 ns->name);
1042
1043 return (uint32_t)n;
1044}
1045
1046// Preference: Vm > V > Ve > Vs > Vse > absent.
1047int
1048find_working_master_ap(const as_partition* p, const sl_ix_t* ns_sl_ix,
1049 const as_namespace* ns)
1050{
1051 int best_n = -1;
1052 int best_score = -1;
1053
1054 for (int n = 0; n < (int)ns->cluster_size; n++) {
1055 const as_partition_version* version = INPUT_VERSION(n);
1056
1057 // Skip versions with no data.
1058 if (! as_partition_version_has_data(version)) {
1059 continue;
1060 }
1061
1062 // If previous working master exists, use it. (There can be more than
1063 // one after split brains. Also, the flag is only to prevent superfluous
1064 // master swaps on rebalance when rack-aware.)
1065 if (version->master == 1) {
1066 return shift_working_master(p, ns_sl_ix, ns, n, version);
1067 }
1068 // else - keep going but remember the best so far.
1069
1070 int score;
1071
1072 if (as_exchange_min_compatibility_id() >= 4) {
1073 // V = 3 > Vs = 2 > Ve > 1 > Vse = 0.
1074 score = 3 - ((version->subset == 1 ? 1 : 0) +
1075 (version->evade == 1 ? 2 : 0));
1076 }
1077 else {
1078 // V = 3 > Ve = 2 > Vs = 1 > Vse = 0.
1079 score = (version->evade == 1 ? 0 : 1) +
1080 (version->subset == 1 ? 0 : 2);
1081 }
1082
1083 if (score > best_score) {
1084 best_score = score;
1085 best_n = n;
1086 }
1087 }
1088
1089 return best_n;
1090}
1091
1092int
1093shift_working_master(const as_partition* p, const sl_ix_t* ns_sl_ix,
1094 const as_namespace* ns, int working_master_n,
1095 const as_partition_version* working_master_version)
1096{
1097 if (working_master_n == 0 || working_master_version->subset == 1) {
1098 return working_master_n; // can only shift full masters
1099 }
1100
1101 for (int n = 0; n < working_master_n; n++) {
1102 const as_partition_version* version = INPUT_VERSION(n);
1103
1104 if (is_same_as_full_master(working_master_version, version)) {
1105 return n; // master flag will get shifted later
1106 }
1107 }
1108
1109 return working_master_n;
1110}
1111
1112uint32_t
1113find_duplicates_ap(const as_partition* p, const cf_node* ns_node_seq,
1114 const sl_ix_t* ns_sl_ix, const as_namespace* ns,
1115 uint32_t working_master_n, cf_node dupls[])
1116{
1117 uint32_t n_dupl = 0;
1118 as_partition_version parent_dupl_versions[ns->cluster_size];
1119
1120 memset(parent_dupl_versions, 0, sizeof(parent_dupl_versions));
1121
1122 for (uint32_t n = 0; n < ns->cluster_size; n++) {
1123 const as_partition_version* version = INPUT_VERSION(n);
1124
1125 // Skip versions without data, and postpone subsets to next pass.
1126 if (! as_partition_version_has_data(version) || version->subset == 1) {
1127 continue;
1128 }
1129
1130 // Every unique version is a duplicate.
1131 if (version->family == VERSION_FAMILY_UNIQUE) {
1132 dupls[n_dupl++] = ns_node_seq[n];
1133 continue;
1134 }
1135
1136 // Add parent versions as duplicates, unless they are already in.
1137
1138 uint32_t d;
1139
1140 for (d = 0; d < n_dupl; d++) {
1141 if (is_family_same(&parent_dupl_versions[d], version)) {
1142 break;
1143 }
1144 }
1145
1146 if (d == n_dupl) {
1147 // Not in dupls.
1148 parent_dupl_versions[n_dupl] = *version;
1149 dupls[n_dupl++] = ns_node_seq[n];
1150 }
1151 }
1152
1153 // Second pass to deal with subsets.
1154 for (uint32_t n = 0; n < ns->cluster_size; n++) {
1155 const as_partition_version* version = INPUT_VERSION(n);
1156
1157 if (version->subset == 0) {
1158 continue;
1159 }
1160
1161 uint32_t d;
1162
1163 for (d = 0; d < n_dupl; d++) {
1164 if (is_family_same(&parent_dupl_versions[d], version)) {
1165 break;
1166 }
1167 }
1168
1169 if (d == n_dupl) {
1170 // Not in dupls.
1171 // Leave 0 in parent_dupl_versions array.
1172 dupls[n_dupl++] = ns_node_seq[n];
1173 }
1174 }
1175
1176 // Remove working master from 'variants' to leave duplicates.
1177 return remove_node(dupls, n_dupl, ns_node_seq[working_master_n]);
1178}
1179
1180uint32_t
1181fill_immigrators(as_partition* p, const sl_ix_t* ns_sl_ix, as_namespace* ns,
1182 uint32_t working_master_n, uint32_t n_dupl)
1183{
1184 uint32_t n_immigrators = 0;
1185
1186 for (uint32_t repl_ix = 0; repl_ix < p->n_replicas; repl_ix++) {
1187 const as_partition_version* version = INPUT_VERSION(repl_ix);
1188
1189 if (n_dupl != 0 || (repl_ix != working_master_n &&
1190 (! as_partition_version_has_data(version) ||
1191 version->subset == 1))) {
1192 p->immigrators[repl_ix] = true;
1193 n_immigrators++;
1194 }
1195 }
1196
1197 return n_immigrators;
1198}
1199
1200void
1201advance_version_ap(as_partition* p, const sl_ix_t* ns_sl_ix, as_namespace* ns,
1202 uint32_t self_n, uint32_t working_master_n, uint32_t n_dupl,
1203 const cf_node dupls[])
1204{
1205 // Advance working master.
1206 if (self_n == working_master_n) {
1207 p->version.ckey = p->final_version.ckey;
1208 p->version.family = (self_n == 0 || n_dupl == 0) ? 0 : 1;
1209 p->version.master = 1;
1210 p->version.subset = 0;
1211 p->version.evade = 0;
1212
1213 return;
1214 }
1215
1216 p->version.master = 0;
1217
1218 bool self_is_versionless = ! as_partition_version_has_data(&p->version);
1219
1220 // Advance eventual master.
1221 if (self_n == 0) {
1222 p->version.ckey = p->final_version.ckey;
1223 p->version.family = 0;
1224 p->version.subset = n_dupl == 0 ? 1 : 0;
1225
1226 if (self_is_versionless) {
1227 p->version.evade = 1;
1228 }
1229 // else - don't change evade flag.
1230
1231 return;
1232 }
1233
1234 // Advance version-less proles and non-replicas (common case).
1235 if (self_is_versionless) {
1236 if (self_n < p->n_replicas) {
1237 p->version.ckey = p->final_version.ckey;
1238 p->version.family = 0;
1239 p->version.subset = 1;
1240 p->version.evade = 1;
1241 }
1242 // else - non-replicas remain version-less.
1243
1244 return;
1245 }
1246
1247 // Fill family versions.
1248
1249 uint32_t max_n_families = p->n_replicas + 1;
1250
1251 if (max_n_families > AS_PARTITION_N_FAMILIES) {
1252 max_n_families = AS_PARTITION_N_FAMILIES;
1253 }
1254
1255 as_partition_version family_versions[max_n_families];
1256 uint32_t n_families = fill_family_versions(p, ns_sl_ix, ns,
1257 working_master_n, n_dupl, dupls, family_versions);
1258
1259 uint32_t family = find_family(&p->version, n_families, family_versions);
1260
1261 // Advance non-masters with prior versions ...
1262
1263 // ... proles ...
1264 if (self_n < p->n_replicas) {
1265 p->version.ckey = p->final_version.ckey;
1266 p->version.family = family;
1267
1268 if (n_dupl != 0 && p->version.family == 0) {
1269 p->version.subset = 1;
1270 }
1271 // else - don't change either subset or evade flag.
1272
1273 return;
1274 }
1275
1276 // ... or non-replicas.
1277 if (family != VERSION_FAMILY_UNIQUE &&
1278 family_versions[family].subset == 0) {
1279 p->version.ckey = p->final_version.ckey;
1280 p->version.family = family;
1281 p->version.subset = 1;
1282 }
1283 // else - leave version as-is.
1284}
1285
1286uint32_t
1287fill_family_versions(const as_partition* p, const sl_ix_t* ns_sl_ix,
1288 const as_namespace* ns, uint32_t working_master_n, uint32_t n_dupl,
1289 const cf_node dupls[], as_partition_version family_versions[])
1290{
1291 uint32_t n_families = 1;
1292 const as_partition_version* final_master_version = INPUT_VERSION(0);
1293
1294 family_versions[0] = *final_master_version;
1295
1296 if (working_master_n != 0) {
1297 const as_partition_version* working_master_version =
1298 INPUT_VERSION(working_master_n);
1299
1300 if (n_dupl == 0) {
1301 family_versions[0] = *working_master_version;
1302 }
1303 else {
1304 family_versions[0] = p->final_version; // not matchable
1305 family_versions[1] = *working_master_version;
1306 n_families = 2;
1307 }
1308 }
1309
1310 for (uint32_t repl_ix = 1;
1311 repl_ix < p->n_replicas && n_families < AS_PARTITION_N_FAMILIES;
1312 repl_ix++) {
1313 if (repl_ix == working_master_n) {
1314 continue;
1315 }
1316
1317 const as_partition_version* version = INPUT_VERSION(repl_ix);
1318
1319 if (contains_node(dupls, n_dupl, p->replicas[repl_ix])) {
1320 family_versions[n_families++] = *version;
1321 }
1322 else if (version->subset == 1 &&
1323 ! has_replica_parent(p, ns_sl_ix, ns, version, repl_ix)) {
1324 family_versions[n_families++] = *version;
1325 }
1326 }
1327
1328 return n_families;
1329}
1330
1331bool
1332has_replica_parent(const as_partition* p, const sl_ix_t* ns_sl_ix,
1333 const as_namespace* ns, const as_partition_version* subset_version,
1334 uint32_t subset_n)
1335{
1336 for (uint32_t repl_ix = 1; repl_ix < p->n_replicas; repl_ix++) {
1337 if (repl_ix == subset_n) {
1338 continue;
1339 }
1340
1341 const as_partition_version* version = INPUT_VERSION(repl_ix);
1342
1343 if (version->subset == 0 && is_family_same(version, subset_version)) {
1344 return true;
1345 }
1346 }
1347
1348 return false;
1349}
1350
1351uint32_t
1352find_family(const as_partition_version* self_version, uint32_t n_families,
1353 const as_partition_version family_versions[])
1354{
1355 for (uint32_t n = 0; n < n_families; n++) {
1356 if (is_family_same(self_version, &family_versions[n])) {
1357 return n;
1358 }
1359 }
1360
1361 return VERSION_FAMILY_UNIQUE;
1362}
1363
1364void
1365queue_namespace_migrations(as_partition* p, as_namespace* ns, uint32_t self_n,
1366 cf_node working_master, uint32_t n_dupl, cf_node dupls[],
1367 const uint32_t lead_flags[], cf_queue* mq)
1368{
1369 pb_task task;
1370
1371 if (self_n == 0) {
1372 // <><><><><><> Final Master <><><><><><>
1373
1374 if (g_config.self_node == working_master) {
1375 p->pending_immigrations = (uint16_t)n_dupl;
1376 }
1377 else {
1378 // Remove self from duplicates.
1379 n_dupl = remove_node(dupls, n_dupl, g_config.self_node);
1380
1381 p->pending_immigrations = (uint16_t)n_dupl + 1;
1382 }
1383
1384 if (n_dupl != 0) {
1385 p->n_dupl = n_dupl;
1386 memcpy(p->dupls, dupls, n_dupl * sizeof(cf_node));
1387 }
1388
1389 if (p->pending_immigrations != 0) {
1390 for (uint32_t repl_ix = 1; repl_ix < p->n_replicas; repl_ix++) {
1391 if (p->immigrators[repl_ix]) {
1392 p->pending_emigrations++;
1393 }
1394 }
1395
1396 // Emigrate later, after all immigration is complete.
1397 return;
1398 }
1399
1400 // Emigrate now, no immigrations to wait for.
1401 for (uint32_t repl_ix = 1; repl_ix < p->n_replicas; repl_ix++) {
1402 if (p->immigrators[repl_ix]) {
1403 p->pending_emigrations++;
1404
1405 if (lead_flags[repl_ix] != TX_FLAGS_NONE) {
1406 p->pending_lead_emigrations++;
1407 }
1408
1409 pb_task_init(&task, p->replicas[repl_ix], ns, p->id,
1410 as_exchange_cluster_key(), PB_TASK_EMIG_TRANSFER,
1411 lead_flags[repl_ix]);
1412 cf_queue_push(mq, &task);
1413 }
1414 }
1415
1416 return;
1417 }
1418 // else - <><><><><><> Not Final Master <><><><><><>
1419
1420 if (g_config.self_node == working_master) {
1421 if (n_dupl != 0) {
1422 p->n_dupl = n_dupl;
1423 memcpy(p->dupls, dupls, n_dupl * sizeof(cf_node));
1424 }
1425
1426 p->pending_emigrations = 1;
1427
1428 if (lead_flags[0] != TX_FLAGS_NONE) {
1429 p->pending_lead_emigrations = 1;
1430 }
1431
1432 pb_task_init(&task, p->replicas[0], ns, p->id,
1433 as_exchange_cluster_key(), PB_TASK_EMIG_TRANSFER,
1434 TX_FLAGS_ACTING_MASTER | lead_flags[0]);
1435 cf_queue_push(mq, &task);
1436 }
1437 else if (contains_self(dupls, n_dupl)) {
1438 p->pending_emigrations = 1;
1439
1440 if (lead_flags[0] != TX_FLAGS_NONE) {
1441 p->pending_lead_emigrations = 1;
1442 }
1443
1444 pb_task_init(&task, p->replicas[0], ns, p->id,
1445 as_exchange_cluster_key(), PB_TASK_EMIG_TRANSFER,
1446 lead_flags[0]);
1447 cf_queue_push(mq, &task);
1448 }
1449
1450 if (self_n < p->n_replicas && p->immigrators[self_n]) {
1451 p->pending_immigrations = 1;
1452 }
1453}
1454
1455void
1456fill_witnesses(as_partition* p, const cf_node* ns_node_seq,
1457 const sl_ix_t* ns_sl_ix, as_namespace* ns)
1458{
1459 for (uint32_t n = 1; n < ns->cluster_size; n++) {
1460 const as_partition_version* version = INPUT_VERSION(n);
1461
1462 // Note - 0e/0r versions (CP) are witnesses.
1463 if (n < p->n_replicas || ! as_partition_version_is_null(version)) {
1464 p->witnesses[p->n_witnesses++] = ns_node_seq[n];
1465 }
1466 }
1467}
1468
1469// If version changed, create/drop trees as appropriate, and cache for storage.
1470void
1471handle_version_change(as_partition* p, struct as_namespace_s* ns,
1472 as_partition_version* orig_version)
1473{
1474 if (as_partition_version_same(&p->version, orig_version)) {
1475 return;
1476 }
1477
1478 if (! as_partition_version_has_data(orig_version) &&
1479 as_partition_version_has_data(&p->version)) {
1480 create_trees(p, ns);
1481 }
1482
1483 if (as_partition_version_has_data(orig_version) &&
1484 ! as_partition_version_has_data(&p->version)) {
1485 // FIXME - temporary paranoia.
1486 cf_assert(p->tree, AS_PARTITION, "unexpected - null tree");
1487 drop_trees(p);
1488 }
1489
1490 as_storage_cache_pmeta(ns, p);
1491}
1492
1493
1494//==========================================================
1495// Local helpers - migration-related as_partition methods.
1496//
1497
1498// Sanity checks for immigrations commands.
1499bool
1500partition_immigration_is_valid(const as_partition* p, cf_node source_node,
1501 const as_namespace* ns, const char* tag)
1502{
1503 char* failure_reason = NULL;
1504
1505 if (p->pending_immigrations == 0) {
1506 failure_reason = "no immigrations expected";
1507 }
1508 else if (is_self_final_master(p)) {
1509 if (source_node != p->working_master &&
1510 ! contains_node(p->dupls, p->n_dupl, source_node)) {
1511 failure_reason = "final master's source not acting master or duplicate";
1512 }
1513 }
1514 else if (source_node != p->replicas[0]) {
1515 failure_reason = "prole's source not final working master";
1516 }
1517
1518 if (failure_reason) {
1519 cf_warning(AS_PARTITION, "{%s:%u} immigrate_%s - source %lx working-master %lx pending-immigrations %hu - %s",
1520 ns->name, p->id, tag, source_node, p->working_master,
1521 p->pending_immigrations, failure_reason);
1522
1523 return false;
1524 }
1525
1526 return true;
1527}
1528
1529void
1530emigrate_done_advance_non_master_version_ap(as_namespace* ns, as_partition* p,
1531 uint32_t tx_flags)
1532{
1533 if ((tx_flags & TX_FLAGS_ACTING_MASTER) != 0) {
1534 p->working_master = (cf_node)0;
1535 p->n_dupl = 0;
1536 p->version.master = 0;
1537 }
1538
1539 p->version.ckey = p->final_version.ckey;
1540 p->version.family = 0;
1541
1542 if (p->pending_immigrations != 0 || ! is_self_replica(p)) {
1543 p->version.subset = 1;
1544 }
1545 // else - must already be a parent.
1546
1547 as_storage_save_pmeta(ns, p);
1548}
1549
1550void
1551immigrate_start_advance_non_master_version_ap(as_partition* p)
1552{
1553 // Become subset of final version if not already such.
1554 if (! (p->version.ckey == p->final_version.ckey &&
1555 p->version.family == 0 && p->version.subset == 1)) {
1556 p->version.ckey = p->final_version.ckey;
1557 p->version.family = 0;
1558 p->version.master = 0; // racing emigrate done if we were acting master
1559 p->version.subset = 1;
1560 // Leave evade flag as-is.
1561 }
1562}
1563
1564void
1565immigrate_done_advance_final_master_version_ap(as_namespace* ns,
1566 as_partition* p)
1567{
1568 if (! as_partition_version_same(&p->version, &p->final_version)) {
1569 p->version = p->final_version;
1570 as_storage_save_pmeta(ns, p);
1571 }
1572}
1573