1/*
2 * partition.c
3 *
4 * Copyright (C) 2008-2018 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "fabric/partition.h"
28
29#include <stdbool.h>
30#include <stddef.h>
31#include <stdint.h>
32#include <string.h>
33
34#include "citrusleaf/alloc.h"
35#include "citrusleaf/cf_atomic.h"
36#include "citrusleaf/cf_b64.h"
37
38#include "cf_mutex.h"
39#include "fault.h"
40#include "node.h"
41
42#include "base/cfg.h"
43#include "base/datamodel.h"
44#include "base/index.h"
45#include "base/proto.h"
46#include "base/transaction.h"
47#include "fabric/partition_balance.h"
48
49
50//==========================================================
51// Forward declarations.
52//
53
54cf_node find_best_node(const as_partition* p, bool is_read);
55void accumulate_replica_stats(const as_partition* p, uint64_t* p_n_objects, uint64_t* p_n_tombstones);
56void partition_reserve_lockfree(as_partition* p, as_namespace* ns, as_partition_reservation* rsv);
57char partition_descriptor(const as_partition* p);
58int partition_get_replica_self_lockfree(const as_namespace* ns, uint32_t pid);
59bool should_working_master_own(const as_namespace* ns, uint32_t pid, uint32_t repl_ix);
60
61
62//==========================================================
63// Public API.
64//
65
66void
67as_partition_init(as_namespace* ns, uint32_t pid)
68{
69 as_partition* p = &ns->partitions[pid];
70
71 // Note - as_partition has been zeroed since it's a member of as_namespace.
72 // Set non-zero members.
73
74 cf_mutex_init(&p->lock);
75
76 p->id = pid;
77
78 if (ns->cold_start) {
79 return; // trees are created later when we know which ones we own
80 }
81
82 p->tree = as_index_tree_resume(&ns->tree_shared, ns->xmem_trees, pid,
83 as_partition_tree_done, (void*)p);
84}
85
86void
87as_partition_shutdown(as_namespace* ns, uint32_t pid)
88{
89 as_partition* p = &ns->partitions[pid];
90
91 while (true) {
92 cf_mutex_lock(&p->lock);
93
94 as_index_tree* tree = p->tree;
95 as_index_tree_reserve(tree);
96
97 cf_mutex_unlock(&p->lock);
98
99 // Must come outside partition lock, since transactions may take
100 // partition lock under the record (sprig) lock.
101 as_index_tree_block(tree);
102
103 // If lucky, this remains locked and we complete shutdown.
104 cf_mutex_lock(&p->lock);
105
106 if (tree == p->tree) {
107 break; // lucky - same tree we blocked
108 }
109
110 // Bad luck - blocked a tree that just got switched, block the new one.
111 cf_mutex_unlock(&p->lock);
112 }
113}
114
115void
116as_partition_freeze(as_partition* p)
117{
118 p->working_master = (cf_node)0;
119
120 p->n_nodes = 0;
121 p->n_replicas = 0;
122 p->n_dupl = 0;
123
124 p->pending_emigrations = 0;
125 p->pending_lead_emigrations = 0;
126 p->pending_immigrations = 0;
127
128 p->n_witnesses = 0;
129}
130
131// Get a list of all nodes (excluding self) that are replicas for a specified
132// partition: place the list in *nv and return the number of nodes found.
133uint32_t
134as_partition_get_other_replicas(as_partition* p, cf_node* nv)
135{
136 uint32_t n_other_replicas = 0;
137
138 cf_mutex_lock(&p->lock);
139
140 for (uint32_t repl_ix = 0; repl_ix < p->n_replicas; repl_ix++) {
141 // Don't ever include yourself.
142 if (p->replicas[repl_ix] == g_config.self_node) {
143 continue;
144 }
145
146 // Copy the node ID into the user-supplied vector.
147 nv[n_other_replicas++] = p->replicas[repl_ix];
148 }
149
150 cf_mutex_unlock(&p->lock);
151
152 return n_other_replicas;
153}
154
155cf_node
156as_partition_writable_node(as_namespace* ns, uint32_t pid)
157{
158 as_partition* p = &ns->partitions[pid];
159
160 cf_mutex_lock(&p->lock);
161
162 if (p->n_replicas == 0) {
163 // This partition is unavailable.
164 cf_mutex_unlock(&p->lock);
165 return (cf_node)0;
166 }
167
168 cf_node best_node = find_best_node(p, false);
169
170 cf_mutex_unlock(&p->lock);
171
172 return best_node;
173}
174
175// If this node is an eventual master, return the acting master, else return 0.
176cf_node
177as_partition_proxyee_redirect(as_namespace* ns, uint32_t pid)
178{
179 as_partition* p = &ns->partitions[pid];
180
181 cf_mutex_lock(&p->lock);
182
183 cf_node node = (cf_node)0;
184
185 if (g_config.self_node == p->replicas[0] &&
186 g_config.self_node != p->working_master) {
187 node = p->working_master;
188 }
189
190 cf_mutex_unlock(&p->lock);
191
192 return node;
193}
194
195void
196as_partition_get_replicas_master_str(cf_dyn_buf* db)
197{
198 size_t db_sz = db->used_sz;
199
200 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
201 as_namespace* ns = g_config.namespaces[ns_ix];
202
203 cf_dyn_buf_append_string(db, ns->name);
204 cf_dyn_buf_append_char(db, ':');
205 cf_dyn_buf_append_buf(db, (uint8_t*)ns->replica_maps[0].b64map,
206 sizeof(ns->replica_maps[0].b64map));
207 cf_dyn_buf_append_char(db, ';');
208 }
209
210 if (db_sz != db->used_sz) {
211 cf_dyn_buf_chomp(db);
212 }
213}
214
215void
216as_partition_get_replicas_all_str(cf_dyn_buf* db, bool include_regime)
217{
218 size_t db_sz = db->used_sz;
219
220 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
221 as_namespace* ns = g_config.namespaces[ns_ix];
222
223 cf_dyn_buf_append_string(db, ns->name);
224 cf_dyn_buf_append_char(db, ':');
225
226 if (include_regime) {
227 cf_dyn_buf_append_uint32(db, ns->rebalance_regime);
228 cf_dyn_buf_append_char(db, ',');
229 }
230
231 uint32_t repl_factor = ns->replication_factor;
232
233 // If we haven't rebalanced yet, report 1 column with no ownership.
234 if (repl_factor == 0) {
235 repl_factor = 1;
236 }
237
238 cf_dyn_buf_append_uint32(db, repl_factor);
239
240 for (uint32_t repl_ix = 0; repl_ix < repl_factor; repl_ix++) {
241 cf_dyn_buf_append_char(db, ',');
242 cf_dyn_buf_append_buf(db,
243 (uint8_t*)&ns->replica_maps[repl_ix].b64map,
244 sizeof(ns->replica_maps[repl_ix].b64map));
245 }
246
247 cf_dyn_buf_append_char(db, ';');
248 }
249
250 if (db_sz != db->used_sz) {
251 cf_dyn_buf_chomp(db);
252 }
253}
254
255void
256as_partition_get_replica_stats(as_namespace* ns, repl_stats* p_stats)
257{
258 memset(p_stats, 0, sizeof(repl_stats));
259
260 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
261 as_partition* p = &ns->partitions[pid];
262
263 cf_mutex_lock(&p->lock);
264
265 if (g_config.self_node == p->working_master) {
266 accumulate_replica_stats(p,
267 &p_stats->n_master_objects,
268 &p_stats->n_master_tombstones);
269 }
270 else if (find_self_in_replicas(p) >= 0) { // -1 if not
271 accumulate_replica_stats(p,
272 &p_stats->n_prole_objects,
273 &p_stats->n_prole_tombstones);
274 }
275 else {
276 accumulate_replica_stats(p,
277 &p_stats->n_non_replica_objects,
278 &p_stats->n_non_replica_tombstones);
279 }
280
281 cf_mutex_unlock(&p->lock);
282 }
283}
284
285// TODO - what if partition is unavailable?
286void
287as_partition_reserve(as_namespace* ns, uint32_t pid,
288 as_partition_reservation* rsv)
289{
290 as_partition* p = &ns->partitions[pid];
291
292 cf_mutex_lock(&p->lock);
293
294 partition_reserve_lockfree(p, ns, rsv);
295
296 cf_mutex_unlock(&p->lock);
297}
298
299int
300as_partition_reserve_replica(as_namespace* ns, uint32_t pid,
301 as_partition_reservation* rsv)
302{
303 as_partition* p = &ns->partitions[pid];
304
305 cf_mutex_lock(&p->lock);
306
307 if (! is_self_replica(p)) {
308 cf_mutex_unlock(&p->lock);
309 return AS_ERR_CLUSTER_KEY_MISMATCH;
310 }
311
312 partition_reserve_lockfree(p, ns, rsv);
313
314 cf_mutex_unlock(&p->lock);
315
316 return AS_OK;
317}
318
319// Returns:
320// 0 - reserved - node parameter returns self node
321// -1 - not reserved - node parameter returns other "better" node
322// -2 - not reserved - node parameter not filled - partition is unavailable
323int
324as_partition_reserve_write(as_namespace* ns, uint32_t pid,
325 as_partition_reservation* rsv, cf_node* node)
326{
327 as_partition* p = &ns->partitions[pid];
328
329 cf_mutex_lock(&p->lock);
330
331 // If this partition is frozen, return.
332 if (p->n_replicas == 0) {
333 if (node) {
334 *node = (cf_node)0;
335 }
336
337 cf_mutex_unlock(&p->lock);
338 return -2;
339 }
340
341 cf_node best_node = find_best_node(p, false);
342
343 if (node) {
344 *node = best_node;
345 }
346
347 // If this node is not the appropriate one, return.
348 if (best_node != g_config.self_node) {
349 cf_mutex_unlock(&p->lock);
350 return -1;
351 }
352
353 partition_reserve_lockfree(p, ns, rsv);
354
355 cf_mutex_unlock(&p->lock);
356
357 return 0;
358}
359
360// Returns:
361// 0 - reserved - node parameter returns self node
362// -1 - not reserved - node parameter returns other "better" node
363// -2 - not reserved - node parameter not filled - partition is unavailable
364int
365as_partition_reserve_read_tr(as_namespace* ns, uint32_t pid, as_transaction* tr,
366 cf_node* node)
367{
368 as_partition* p = &ns->partitions[pid];
369
370 cf_mutex_lock(&p->lock);
371
372 // Handle unavailable partition.
373 if (p->n_replicas == 0) {
374 int result = partition_reserve_unavailable(ns, p, tr, node);
375
376 if (result == 0) {
377 partition_reserve_lockfree(p, ns, &tr->rsv);
378 }
379
380 cf_mutex_unlock(&p->lock);
381 return result;
382 }
383
384 cf_node best_node = find_best_node(p,
385 ! partition_reserve_promote(ns, p, tr));
386
387 if (node) {
388 *node = best_node;
389 }
390
391 // If this node is not the appropriate one, return.
392 if (best_node != g_config.self_node) {
393 cf_mutex_unlock(&p->lock);
394 return -1;
395 }
396
397 partition_reserve_lockfree(p, ns, &tr->rsv);
398
399 cf_mutex_unlock(&p->lock);
400
401 return 0;
402}
403
404// Reserves all query-able partitions.
405// Returns the number of partitions reserved.
406int
407as_partition_prereserve_query(as_namespace* ns, bool can_partition_query[],
408 as_partition_reservation rsv[])
409{
410 int reserved = 0;
411
412 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
413 if (as_partition_reserve_query(ns, pid, &rsv[pid])) {
414 can_partition_query[pid] = false;
415 }
416 else {
417 can_partition_query[pid] = true;
418 reserved++;
419 }
420 }
421
422 return reserved;
423}
424
425// Reserve a partition for query.
426// Return value 0 means the reservation was taken, -1 means not.
427int
428as_partition_reserve_query(as_namespace* ns, uint32_t pid,
429 as_partition_reservation* rsv)
430{
431 return as_partition_reserve_write(ns, pid, rsv, NULL);
432}
433
434// Succeeds if we are full working master or prole.
435int
436as_partition_reserve_xdr_read(as_namespace* ns, uint32_t pid,
437 as_partition_reservation* rsv)
438{
439 as_partition* p = &ns->partitions[pid];
440
441 cf_mutex_lock(&p->lock);
442
443 int res = -1;
444
445 if (p->pending_immigrations == 0 &&
446 (g_config.self_node == p->working_master ||
447 find_self_in_replicas(p) >= 0)) {
448 partition_reserve_lockfree(p, ns, rsv);
449 res = 0;
450 }
451
452 cf_mutex_unlock(&p->lock);
453
454 return res;
455}
456
457void
458as_partition_reservation_copy(as_partition_reservation* dst,
459 as_partition_reservation* src)
460{
461 dst->ns = src->ns;
462 dst->p = src->p;
463 dst->tree = src->tree;
464 dst->regime = src->regime;
465 dst->n_dupl = src->n_dupl;
466
467 if (dst->n_dupl != 0) {
468 memcpy(dst->dupl_nodes, src->dupl_nodes, sizeof(cf_node) * dst->n_dupl);
469 }
470}
471
472void
473as_partition_release(as_partition_reservation* rsv)
474{
475 as_index_tree_release(rsv->tree);
476}
477
478void
479as_partition_advance_tree_id(as_partition* p, const char* ns_name)
480{
481 uint32_t n_hanging;
482
483 // Find first available tree-id past current one. Should be very next one.
484 for (n_hanging = 0; n_hanging < MAX_NUM_TREE_IDS; n_hanging++) {
485 p->tree_id = (p->tree_id + 1) & TREE_ID_MASK;
486
487 uint64_t id_mask = 1UL << p->tree_id;
488
489 if ((p->tree_ids_used & id_mask) == 0) {
490 // Claim tree-id. Claim is relinquished when tree is destroyed.
491 p->tree_ids_used |= id_mask;
492 break;
493 }
494 }
495
496 // If no available tree-ids, just stop. Should never happen.
497 if (n_hanging == MAX_NUM_TREE_IDS) {
498 cf_crash(AS_PARTITION, "{%s} pid %u has %u dropped trees hanging",
499 ns_name, p->id, n_hanging);
500 }
501
502 // Too many hanging trees - ref-count leak? Offer chance to warm restart.
503 if (n_hanging > MAX_NUM_TREE_IDS / 2) {
504 cf_warning(AS_PARTITION, "{%s} pid %u has %u dropped trees hanging",
505 ns_name, p->id, n_hanging);
506 }
507}
508
509// Callback made when dropped as_index_tree is finally destroyed.
510void
511as_partition_tree_done(uint8_t id, void* udata)
512{
513 as_partition* p = (as_partition*)udata;
514
515 cf_mutex_lock(&p->lock);
516
517 // Relinquish tree-id.
518 p->tree_ids_used &= ~(1UL << id);
519
520 cf_mutex_unlock(&p->lock);
521}
522
523void
524as_partition_getinfo_str(cf_dyn_buf* db)
525{
526 size_t db_sz = db->used_sz;
527
528 cf_dyn_buf_append_string(db, "namespace:partition:state:n_replicas:replica:"
529 "n_dupl:working_master:emigrates:lead_emigrates:immigrates:records:"
530 "tombstones:regime:version:final_version;");
531
532 for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) {
533 as_namespace* ns = g_config.namespaces[ns_ix];
534
535 for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) {
536 as_partition* p = &ns->partitions[pid];
537
538 cf_mutex_lock(&p->lock);
539
540 cf_dyn_buf_append_string(db, ns->name);
541 cf_dyn_buf_append_char(db, ':');
542 cf_dyn_buf_append_uint32(db, pid);
543 cf_dyn_buf_append_char(db, ':');
544 cf_dyn_buf_append_char(db, partition_descriptor(p));
545 cf_dyn_buf_append_char(db, ':');
546 cf_dyn_buf_append_uint32(db, p->n_replicas);
547 cf_dyn_buf_append_char(db, ':');
548 cf_dyn_buf_append_int(db, find_self_in_replicas(p));
549 cf_dyn_buf_append_char(db, ':');
550 cf_dyn_buf_append_uint32(db, p->n_dupl);
551 cf_dyn_buf_append_char(db, ':');
552 cf_dyn_buf_append_uint64_x(db, p->working_master);
553 cf_dyn_buf_append_char(db, ':');
554 cf_dyn_buf_append_uint32(db, p->pending_emigrations);
555 cf_dyn_buf_append_char(db, ':');
556 cf_dyn_buf_append_uint32(db, p->pending_lead_emigrations);
557 cf_dyn_buf_append_char(db, ':');
558 cf_dyn_buf_append_uint32(db, p->pending_immigrations);
559 cf_dyn_buf_append_char(db, ':');
560 cf_dyn_buf_append_uint32(db, as_index_tree_size(p->tree));
561 cf_dyn_buf_append_char(db, ':');
562 cf_dyn_buf_append_uint64(db, p->n_tombstones);
563 cf_dyn_buf_append_char(db, ':');
564 cf_dyn_buf_append_uint32(db, p->regime);
565 cf_dyn_buf_append_char(db, ':');
566 cf_dyn_buf_append_string(db, VERSION_AS_STRING(&p->version));
567 cf_dyn_buf_append_char(db, ':');
568 cf_dyn_buf_append_string(db, VERSION_AS_STRING(&p->final_version));
569
570 cf_dyn_buf_append_char(db, ';');
571
572 cf_mutex_unlock(&p->lock);
573 }
574 }
575
576 if (db_sz != db->used_sz) {
577 cf_dyn_buf_chomp(db); // take back the final ';'
578 }
579}
580
581
582//==========================================================
583// Public API - client view replica maps.
584//
585
586void
587client_replica_maps_create(as_namespace* ns)
588{
589 uint32_t size = sizeof(client_replica_map) * ns->cfg_replication_factor;
590
591 ns->replica_maps = cf_malloc(size);
592 memset(ns->replica_maps, 0, size);
593
594 for (uint32_t repl_ix = 0; repl_ix < ns->cfg_replication_factor;
595 repl_ix++) {
596 client_replica_map* repl_map = &ns->replica_maps[repl_ix];
597
598 cf_mutex_init(&repl_map->write_lock);
599
600 cf_b64_encode((uint8_t*)repl_map->bitmap,
601 (uint32_t)sizeof(repl_map->bitmap), (char*)repl_map->b64map);
602 }
603}
604
605void
606client_replica_maps_clear(as_namespace* ns)
607{
608 memset(ns->replica_maps, 0,
609 sizeof(client_replica_map) * ns->cfg_replication_factor);
610
611 for (uint32_t repl_ix = 0; repl_ix < ns->cfg_replication_factor;
612 repl_ix++) {
613 client_replica_map* repl_map = &ns->replica_maps[repl_ix];
614
615 cf_b64_encode((uint8_t*)repl_map->bitmap,
616 (uint32_t)sizeof(repl_map->bitmap), (char*)repl_map->b64map);
617 }
618}
619
620bool
621client_replica_maps_update(as_namespace* ns, uint32_t pid)
622{
623 uint32_t byte_i = pid >> 3;
624 uint32_t byte_chunk = (byte_i / 3);
625 uint32_t chunk_bitmap_offset = byte_chunk * 3;
626 uint32_t chunk_b64map_offset = byte_chunk << 2;
627
628 uint32_t bytes_from_end = CLIENT_BITMAP_BYTES - chunk_bitmap_offset;
629 uint32_t input_size = bytes_from_end > 3 ? 3 : bytes_from_end;
630
631 int replica = partition_get_replica_self_lockfree(ns, pid); // -1 if not
632 uint8_t set_mask = 0x80 >> (pid & 0x7);
633 bool changed = false;
634
635 for (int repl_ix = 0; repl_ix < (int)ns->cfg_replication_factor;
636 repl_ix++) {
637 client_replica_map* repl_map = &ns->replica_maps[repl_ix];
638
639 volatile uint8_t* mbyte = repl_map->bitmap + byte_i;
640
641 bool owned = replica == repl_ix ||
642 // Working master also owns all immigrating prole columns, and
643 // if it's an acting master in a prole column, that column (e.g.
644 // full, and migrating to newly added master).
645 (replica == 0 && // am working master
646 should_working_master_own(ns, pid, repl_ix));
647
648 bool is_set = (*mbyte & set_mask) != 0;
649 bool needs_update = (owned && ! is_set) || (! owned && is_set);
650
651 if (! needs_update) {
652 continue;
653 }
654
655 volatile uint8_t* bitmap_chunk = repl_map->bitmap + chunk_bitmap_offset;
656 volatile char* b64map_chunk = repl_map->b64map + chunk_b64map_offset;
657
658 cf_mutex_lock(&repl_map->write_lock);
659
660 *mbyte ^= set_mask;
661 cf_b64_encode((uint8_t*)bitmap_chunk, input_size, (char*)b64map_chunk);
662
663 cf_mutex_unlock(&repl_map->write_lock);
664
665 changed = true;
666 }
667
668 return changed;
669}
670
671bool
672client_replica_maps_is_partition_queryable(const as_namespace* ns, uint32_t pid)
673{
674 uint32_t byte_i = pid >> 3;
675
676 const client_replica_map* repl_map = ns->replica_maps;
677 const volatile uint8_t* mbyte = repl_map->bitmap + byte_i;
678
679 uint8_t set_mask = 0x80 >> (pid & 0x7);
680
681 return (*mbyte & set_mask) != 0;
682}
683
684
685//==========================================================
686// Local helpers.
687//
688
689// Find best node to handle read/write. Called within partition lock.
690cf_node
691find_best_node(const as_partition* p, bool is_read)
692{
693 // Working master (final or acting) returns self, eventual master returns
694 // acting master. Others don't have p->working_master set.
695 if (p->working_master != (cf_node)0) {
696 return p->working_master;
697 }
698
699 if (is_read && p->pending_immigrations == 0 &&
700 find_self_in_replicas(p) > 0) {
701 return g_config.self_node; // may read from prole that's got everything
702 }
703
704 return p->replicas[0]; // final master as a last resort
705}
706
707void
708accumulate_replica_stats(const as_partition* p, uint64_t* p_n_objects,
709 uint64_t* p_n_tombstones)
710{
711 int64_t n_tombstones = (int64_t)p->n_tombstones;
712 int64_t n_objects = (int64_t)as_index_tree_size(p->tree) - n_tombstones;
713
714 *p_n_objects += n_objects > 0 ? (uint64_t)n_objects : 0;
715 *p_n_tombstones += (uint64_t)n_tombstones;
716}
717
718void
719partition_reserve_lockfree(as_partition* p, as_namespace* ns,
720 as_partition_reservation* rsv)
721{
722 as_index_tree_reserve(p->tree);
723
724 rsv->ns = ns;
725 rsv->p = p;
726 rsv->tree = p->tree;
727 rsv->regime = p->regime;
728 rsv->n_dupl = p->n_dupl;
729
730 if (rsv->n_dupl != 0) {
731 memcpy(rsv->dupl_nodes, p->dupls, sizeof(cf_node) * rsv->n_dupl);
732 }
733}
734
735char
736partition_descriptor(const as_partition* p)
737{
738 if (find_self_in_replicas(p) >= 0) { // -1 if not
739 return p->pending_immigrations == 0 ? 'S' : 'D';
740 }
741
742 if (as_partition_version_is_null(&p->version)) {
743 return 'A';
744 }
745
746 return as_partition_version_has_data(&p->version) ? 'Z' : 'X';
747}
748
749int
750partition_get_replica_self_lockfree(const as_namespace* ns, uint32_t pid)
751{
752 const as_partition* p = &ns->partitions[pid];
753
754 if (g_config.self_node == p->working_master) {
755 return 0;
756 }
757
758 int self_n = find_self_in_replicas(p); // -1 if not
759
760 if (self_n > 0 && p->pending_immigrations == 0 &&
761 // Check self_n < n_repl only because n_repl could be out-of-sync
762 // with (less than) partition's replica list count.
763 self_n < (int)ns->replication_factor) {
764 return self_n;
765 }
766
767 return -1; // not a replica
768}
769
770bool
771should_working_master_own(const as_namespace* ns, uint32_t pid,
772 uint32_t repl_ix)
773{
774 const as_partition* p = &ns->partitions[pid];
775
776 return find_self_in_replicas(p) == (int)repl_ix || p->immigrators[repl_ix];
777}
778