1 | /* |
2 | * partition.c |
3 | * |
4 | * Copyright (C) 2008-2018 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "fabric/partition.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | #include <string.h> |
33 | |
34 | #include "citrusleaf/alloc.h" |
35 | #include "citrusleaf/cf_atomic.h" |
36 | #include "citrusleaf/cf_b64.h" |
37 | |
38 | #include "cf_mutex.h" |
39 | #include "fault.h" |
40 | #include "node.h" |
41 | |
42 | #include "base/cfg.h" |
43 | #include "base/datamodel.h" |
44 | #include "base/index.h" |
45 | #include "base/proto.h" |
46 | #include "base/transaction.h" |
47 | #include "fabric/partition_balance.h" |
48 | |
49 | |
50 | //========================================================== |
51 | // Forward declarations. |
52 | // |
53 | |
54 | cf_node find_best_node(const as_partition* p, bool is_read); |
55 | void accumulate_replica_stats(const as_partition* p, uint64_t* p_n_objects, uint64_t* p_n_tombstones); |
56 | void partition_reserve_lockfree(as_partition* p, as_namespace* ns, as_partition_reservation* rsv); |
57 | char partition_descriptor(const as_partition* p); |
58 | int partition_get_replica_self_lockfree(const as_namespace* ns, uint32_t pid); |
59 | bool should_working_master_own(const as_namespace* ns, uint32_t pid, uint32_t repl_ix); |
60 | |
61 | |
62 | //========================================================== |
63 | // Public API. |
64 | // |
65 | |
66 | void |
67 | as_partition_init(as_namespace* ns, uint32_t pid) |
68 | { |
69 | as_partition* p = &ns->partitions[pid]; |
70 | |
71 | // Note - as_partition has been zeroed since it's a member of as_namespace. |
72 | // Set non-zero members. |
73 | |
74 | cf_mutex_init(&p->lock); |
75 | |
76 | p->id = pid; |
77 | |
78 | if (ns->cold_start) { |
79 | return; // trees are created later when we know which ones we own |
80 | } |
81 | |
82 | p->tree = as_index_tree_resume(&ns->tree_shared, ns->xmem_trees, pid, |
83 | as_partition_tree_done, (void*)p); |
84 | } |
85 | |
86 | void |
87 | as_partition_shutdown(as_namespace* ns, uint32_t pid) |
88 | { |
89 | as_partition* p = &ns->partitions[pid]; |
90 | |
91 | while (true) { |
92 | cf_mutex_lock(&p->lock); |
93 | |
94 | as_index_tree* tree = p->tree; |
95 | as_index_tree_reserve(tree); |
96 | |
97 | cf_mutex_unlock(&p->lock); |
98 | |
99 | // Must come outside partition lock, since transactions may take |
100 | // partition lock under the record (sprig) lock. |
101 | as_index_tree_block(tree); |
102 | |
103 | // If lucky, this remains locked and we complete shutdown. |
104 | cf_mutex_lock(&p->lock); |
105 | |
106 | if (tree == p->tree) { |
107 | break; // lucky - same tree we blocked |
108 | } |
109 | |
110 | // Bad luck - blocked a tree that just got switched, block the new one. |
111 | cf_mutex_unlock(&p->lock); |
112 | } |
113 | } |
114 | |
115 | void |
116 | as_partition_freeze(as_partition* p) |
117 | { |
118 | p->working_master = (cf_node)0; |
119 | |
120 | p->n_nodes = 0; |
121 | p->n_replicas = 0; |
122 | p->n_dupl = 0; |
123 | |
124 | p->pending_emigrations = 0; |
125 | p->pending_lead_emigrations = 0; |
126 | p->pending_immigrations = 0; |
127 | |
128 | p->n_witnesses = 0; |
129 | } |
130 | |
131 | // Get a list of all nodes (excluding self) that are replicas for a specified |
132 | // partition: place the list in *nv and return the number of nodes found. |
133 | uint32_t |
134 | as_partition_get_other_replicas(as_partition* p, cf_node* nv) |
135 | { |
136 | uint32_t n_other_replicas = 0; |
137 | |
138 | cf_mutex_lock(&p->lock); |
139 | |
140 | for (uint32_t repl_ix = 0; repl_ix < p->n_replicas; repl_ix++) { |
141 | // Don't ever include yourself. |
142 | if (p->replicas[repl_ix] == g_config.self_node) { |
143 | continue; |
144 | } |
145 | |
146 | // Copy the node ID into the user-supplied vector. |
147 | nv[n_other_replicas++] = p->replicas[repl_ix]; |
148 | } |
149 | |
150 | cf_mutex_unlock(&p->lock); |
151 | |
152 | return n_other_replicas; |
153 | } |
154 | |
155 | cf_node |
156 | as_partition_writable_node(as_namespace* ns, uint32_t pid) |
157 | { |
158 | as_partition* p = &ns->partitions[pid]; |
159 | |
160 | cf_mutex_lock(&p->lock); |
161 | |
162 | if (p->n_replicas == 0) { |
163 | // This partition is unavailable. |
164 | cf_mutex_unlock(&p->lock); |
165 | return (cf_node)0; |
166 | } |
167 | |
168 | cf_node best_node = find_best_node(p, false); |
169 | |
170 | cf_mutex_unlock(&p->lock); |
171 | |
172 | return best_node; |
173 | } |
174 | |
175 | // If this node is an eventual master, return the acting master, else return 0. |
176 | cf_node |
177 | as_partition_proxyee_redirect(as_namespace* ns, uint32_t pid) |
178 | { |
179 | as_partition* p = &ns->partitions[pid]; |
180 | |
181 | cf_mutex_lock(&p->lock); |
182 | |
183 | cf_node node = (cf_node)0; |
184 | |
185 | if (g_config.self_node == p->replicas[0] && |
186 | g_config.self_node != p->working_master) { |
187 | node = p->working_master; |
188 | } |
189 | |
190 | cf_mutex_unlock(&p->lock); |
191 | |
192 | return node; |
193 | } |
194 | |
195 | void |
196 | as_partition_get_replicas_master_str(cf_dyn_buf* db) |
197 | { |
198 | size_t db_sz = db->used_sz; |
199 | |
200 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
201 | as_namespace* ns = g_config.namespaces[ns_ix]; |
202 | |
203 | cf_dyn_buf_append_string(db, ns->name); |
204 | cf_dyn_buf_append_char(db, ':'); |
205 | cf_dyn_buf_append_buf(db, (uint8_t*)ns->replica_maps[0].b64map, |
206 | sizeof(ns->replica_maps[0].b64map)); |
207 | cf_dyn_buf_append_char(db, ';'); |
208 | } |
209 | |
210 | if (db_sz != db->used_sz) { |
211 | cf_dyn_buf_chomp(db); |
212 | } |
213 | } |
214 | |
215 | void |
216 | as_partition_get_replicas_all_str(cf_dyn_buf* db, bool include_regime) |
217 | { |
218 | size_t db_sz = db->used_sz; |
219 | |
220 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
221 | as_namespace* ns = g_config.namespaces[ns_ix]; |
222 | |
223 | cf_dyn_buf_append_string(db, ns->name); |
224 | cf_dyn_buf_append_char(db, ':'); |
225 | |
226 | if (include_regime) { |
227 | cf_dyn_buf_append_uint32(db, ns->rebalance_regime); |
228 | cf_dyn_buf_append_char(db, ','); |
229 | } |
230 | |
231 | uint32_t repl_factor = ns->replication_factor; |
232 | |
233 | // If we haven't rebalanced yet, report 1 column with no ownership. |
234 | if (repl_factor == 0) { |
235 | repl_factor = 1; |
236 | } |
237 | |
238 | cf_dyn_buf_append_uint32(db, repl_factor); |
239 | |
240 | for (uint32_t repl_ix = 0; repl_ix < repl_factor; repl_ix++) { |
241 | cf_dyn_buf_append_char(db, ','); |
242 | cf_dyn_buf_append_buf(db, |
243 | (uint8_t*)&ns->replica_maps[repl_ix].b64map, |
244 | sizeof(ns->replica_maps[repl_ix].b64map)); |
245 | } |
246 | |
247 | cf_dyn_buf_append_char(db, ';'); |
248 | } |
249 | |
250 | if (db_sz != db->used_sz) { |
251 | cf_dyn_buf_chomp(db); |
252 | } |
253 | } |
254 | |
255 | void |
256 | as_partition_get_replica_stats(as_namespace* ns, repl_stats* p_stats) |
257 | { |
258 | memset(p_stats, 0, sizeof(repl_stats)); |
259 | |
260 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
261 | as_partition* p = &ns->partitions[pid]; |
262 | |
263 | cf_mutex_lock(&p->lock); |
264 | |
265 | if (g_config.self_node == p->working_master) { |
266 | accumulate_replica_stats(p, |
267 | &p_stats->n_master_objects, |
268 | &p_stats->n_master_tombstones); |
269 | } |
270 | else if (find_self_in_replicas(p) >= 0) { // -1 if not |
271 | accumulate_replica_stats(p, |
272 | &p_stats->n_prole_objects, |
273 | &p_stats->n_prole_tombstones); |
274 | } |
275 | else { |
276 | accumulate_replica_stats(p, |
277 | &p_stats->n_non_replica_objects, |
278 | &p_stats->n_non_replica_tombstones); |
279 | } |
280 | |
281 | cf_mutex_unlock(&p->lock); |
282 | } |
283 | } |
284 | |
285 | // TODO - what if partition is unavailable? |
286 | void |
287 | as_partition_reserve(as_namespace* ns, uint32_t pid, |
288 | as_partition_reservation* rsv) |
289 | { |
290 | as_partition* p = &ns->partitions[pid]; |
291 | |
292 | cf_mutex_lock(&p->lock); |
293 | |
294 | partition_reserve_lockfree(p, ns, rsv); |
295 | |
296 | cf_mutex_unlock(&p->lock); |
297 | } |
298 | |
299 | int |
300 | as_partition_reserve_replica(as_namespace* ns, uint32_t pid, |
301 | as_partition_reservation* rsv) |
302 | { |
303 | as_partition* p = &ns->partitions[pid]; |
304 | |
305 | cf_mutex_lock(&p->lock); |
306 | |
307 | if (! is_self_replica(p)) { |
308 | cf_mutex_unlock(&p->lock); |
309 | return AS_ERR_CLUSTER_KEY_MISMATCH; |
310 | } |
311 | |
312 | partition_reserve_lockfree(p, ns, rsv); |
313 | |
314 | cf_mutex_unlock(&p->lock); |
315 | |
316 | return AS_OK; |
317 | } |
318 | |
319 | // Returns: |
320 | // 0 - reserved - node parameter returns self node |
321 | // -1 - not reserved - node parameter returns other "better" node |
322 | // -2 - not reserved - node parameter not filled - partition is unavailable |
323 | int |
324 | as_partition_reserve_write(as_namespace* ns, uint32_t pid, |
325 | as_partition_reservation* rsv, cf_node* node) |
326 | { |
327 | as_partition* p = &ns->partitions[pid]; |
328 | |
329 | cf_mutex_lock(&p->lock); |
330 | |
331 | // If this partition is frozen, return. |
332 | if (p->n_replicas == 0) { |
333 | if (node) { |
334 | *node = (cf_node)0; |
335 | } |
336 | |
337 | cf_mutex_unlock(&p->lock); |
338 | return -2; |
339 | } |
340 | |
341 | cf_node best_node = find_best_node(p, false); |
342 | |
343 | if (node) { |
344 | *node = best_node; |
345 | } |
346 | |
347 | // If this node is not the appropriate one, return. |
348 | if (best_node != g_config.self_node) { |
349 | cf_mutex_unlock(&p->lock); |
350 | return -1; |
351 | } |
352 | |
353 | partition_reserve_lockfree(p, ns, rsv); |
354 | |
355 | cf_mutex_unlock(&p->lock); |
356 | |
357 | return 0; |
358 | } |
359 | |
360 | // Returns: |
361 | // 0 - reserved - node parameter returns self node |
362 | // -1 - not reserved - node parameter returns other "better" node |
363 | // -2 - not reserved - node parameter not filled - partition is unavailable |
364 | int |
365 | as_partition_reserve_read_tr(as_namespace* ns, uint32_t pid, as_transaction* tr, |
366 | cf_node* node) |
367 | { |
368 | as_partition* p = &ns->partitions[pid]; |
369 | |
370 | cf_mutex_lock(&p->lock); |
371 | |
372 | // Handle unavailable partition. |
373 | if (p->n_replicas == 0) { |
374 | int result = partition_reserve_unavailable(ns, p, tr, node); |
375 | |
376 | if (result == 0) { |
377 | partition_reserve_lockfree(p, ns, &tr->rsv); |
378 | } |
379 | |
380 | cf_mutex_unlock(&p->lock); |
381 | return result; |
382 | } |
383 | |
384 | cf_node best_node = find_best_node(p, |
385 | ! partition_reserve_promote(ns, p, tr)); |
386 | |
387 | if (node) { |
388 | *node = best_node; |
389 | } |
390 | |
391 | // If this node is not the appropriate one, return. |
392 | if (best_node != g_config.self_node) { |
393 | cf_mutex_unlock(&p->lock); |
394 | return -1; |
395 | } |
396 | |
397 | partition_reserve_lockfree(p, ns, &tr->rsv); |
398 | |
399 | cf_mutex_unlock(&p->lock); |
400 | |
401 | return 0; |
402 | } |
403 | |
404 | // Reserves all query-able partitions. |
405 | // Returns the number of partitions reserved. |
406 | int |
407 | as_partition_prereserve_query(as_namespace* ns, bool can_partition_query[], |
408 | as_partition_reservation rsv[]) |
409 | { |
410 | int reserved = 0; |
411 | |
412 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
413 | if (as_partition_reserve_query(ns, pid, &rsv[pid])) { |
414 | can_partition_query[pid] = false; |
415 | } |
416 | else { |
417 | can_partition_query[pid] = true; |
418 | reserved++; |
419 | } |
420 | } |
421 | |
422 | return reserved; |
423 | } |
424 | |
425 | // Reserve a partition for query. |
426 | // Return value 0 means the reservation was taken, -1 means not. |
427 | int |
428 | as_partition_reserve_query(as_namespace* ns, uint32_t pid, |
429 | as_partition_reservation* rsv) |
430 | { |
431 | return as_partition_reserve_write(ns, pid, rsv, NULL); |
432 | } |
433 | |
434 | // Succeeds if we are full working master or prole. |
435 | int |
436 | as_partition_reserve_xdr_read(as_namespace* ns, uint32_t pid, |
437 | as_partition_reservation* rsv) |
438 | { |
439 | as_partition* p = &ns->partitions[pid]; |
440 | |
441 | cf_mutex_lock(&p->lock); |
442 | |
443 | int res = -1; |
444 | |
445 | if (p->pending_immigrations == 0 && |
446 | (g_config.self_node == p->working_master || |
447 | find_self_in_replicas(p) >= 0)) { |
448 | partition_reserve_lockfree(p, ns, rsv); |
449 | res = 0; |
450 | } |
451 | |
452 | cf_mutex_unlock(&p->lock); |
453 | |
454 | return res; |
455 | } |
456 | |
457 | void |
458 | as_partition_reservation_copy(as_partition_reservation* dst, |
459 | as_partition_reservation* src) |
460 | { |
461 | dst->ns = src->ns; |
462 | dst->p = src->p; |
463 | dst->tree = src->tree; |
464 | dst->regime = src->regime; |
465 | dst->n_dupl = src->n_dupl; |
466 | |
467 | if (dst->n_dupl != 0) { |
468 | memcpy(dst->dupl_nodes, src->dupl_nodes, sizeof(cf_node) * dst->n_dupl); |
469 | } |
470 | } |
471 | |
472 | void |
473 | as_partition_release(as_partition_reservation* rsv) |
474 | { |
475 | as_index_tree_release(rsv->tree); |
476 | } |
477 | |
478 | void |
479 | as_partition_advance_tree_id(as_partition* p, const char* ns_name) |
480 | { |
481 | uint32_t n_hanging; |
482 | |
483 | // Find first available tree-id past current one. Should be very next one. |
484 | for (n_hanging = 0; n_hanging < MAX_NUM_TREE_IDS; n_hanging++) { |
485 | p->tree_id = (p->tree_id + 1) & TREE_ID_MASK; |
486 | |
487 | uint64_t id_mask = 1UL << p->tree_id; |
488 | |
489 | if ((p->tree_ids_used & id_mask) == 0) { |
490 | // Claim tree-id. Claim is relinquished when tree is destroyed. |
491 | p->tree_ids_used |= id_mask; |
492 | break; |
493 | } |
494 | } |
495 | |
496 | // If no available tree-ids, just stop. Should never happen. |
497 | if (n_hanging == MAX_NUM_TREE_IDS) { |
498 | cf_crash(AS_PARTITION, "{%s} pid %u has %u dropped trees hanging" , |
499 | ns_name, p->id, n_hanging); |
500 | } |
501 | |
502 | // Too many hanging trees - ref-count leak? Offer chance to warm restart. |
503 | if (n_hanging > MAX_NUM_TREE_IDS / 2) { |
504 | cf_warning(AS_PARTITION, "{%s} pid %u has %u dropped trees hanging" , |
505 | ns_name, p->id, n_hanging); |
506 | } |
507 | } |
508 | |
509 | // Callback made when dropped as_index_tree is finally destroyed. |
510 | void |
511 | as_partition_tree_done(uint8_t id, void* udata) |
512 | { |
513 | as_partition* p = (as_partition*)udata; |
514 | |
515 | cf_mutex_lock(&p->lock); |
516 | |
517 | // Relinquish tree-id. |
518 | p->tree_ids_used &= ~(1UL << id); |
519 | |
520 | cf_mutex_unlock(&p->lock); |
521 | } |
522 | |
523 | void |
524 | as_partition_getinfo_str(cf_dyn_buf* db) |
525 | { |
526 | size_t db_sz = db->used_sz; |
527 | |
528 | cf_dyn_buf_append_string(db, "namespace:partition:state:n_replicas:replica:" |
529 | "n_dupl:working_master:emigrates:lead_emigrates:immigrates:records:" |
530 | "tombstones:regime:version:final_version;" ); |
531 | |
532 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
533 | as_namespace* ns = g_config.namespaces[ns_ix]; |
534 | |
535 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
536 | as_partition* p = &ns->partitions[pid]; |
537 | |
538 | cf_mutex_lock(&p->lock); |
539 | |
540 | cf_dyn_buf_append_string(db, ns->name); |
541 | cf_dyn_buf_append_char(db, ':'); |
542 | cf_dyn_buf_append_uint32(db, pid); |
543 | cf_dyn_buf_append_char(db, ':'); |
544 | cf_dyn_buf_append_char(db, partition_descriptor(p)); |
545 | cf_dyn_buf_append_char(db, ':'); |
546 | cf_dyn_buf_append_uint32(db, p->n_replicas); |
547 | cf_dyn_buf_append_char(db, ':'); |
548 | cf_dyn_buf_append_int(db, find_self_in_replicas(p)); |
549 | cf_dyn_buf_append_char(db, ':'); |
550 | cf_dyn_buf_append_uint32(db, p->n_dupl); |
551 | cf_dyn_buf_append_char(db, ':'); |
552 | cf_dyn_buf_append_uint64_x(db, p->working_master); |
553 | cf_dyn_buf_append_char(db, ':'); |
554 | cf_dyn_buf_append_uint32(db, p->pending_emigrations); |
555 | cf_dyn_buf_append_char(db, ':'); |
556 | cf_dyn_buf_append_uint32(db, p->pending_lead_emigrations); |
557 | cf_dyn_buf_append_char(db, ':'); |
558 | cf_dyn_buf_append_uint32(db, p->pending_immigrations); |
559 | cf_dyn_buf_append_char(db, ':'); |
560 | cf_dyn_buf_append_uint32(db, as_index_tree_size(p->tree)); |
561 | cf_dyn_buf_append_char(db, ':'); |
562 | cf_dyn_buf_append_uint64(db, p->n_tombstones); |
563 | cf_dyn_buf_append_char(db, ':'); |
564 | cf_dyn_buf_append_uint32(db, p->regime); |
565 | cf_dyn_buf_append_char(db, ':'); |
566 | cf_dyn_buf_append_string(db, VERSION_AS_STRING(&p->version)); |
567 | cf_dyn_buf_append_char(db, ':'); |
568 | cf_dyn_buf_append_string(db, VERSION_AS_STRING(&p->final_version)); |
569 | |
570 | cf_dyn_buf_append_char(db, ';'); |
571 | |
572 | cf_mutex_unlock(&p->lock); |
573 | } |
574 | } |
575 | |
576 | if (db_sz != db->used_sz) { |
577 | cf_dyn_buf_chomp(db); // take back the final ';' |
578 | } |
579 | } |
580 | |
581 | |
582 | //========================================================== |
583 | // Public API - client view replica maps. |
584 | // |
585 | |
586 | void |
587 | client_replica_maps_create(as_namespace* ns) |
588 | { |
589 | uint32_t size = sizeof(client_replica_map) * ns->cfg_replication_factor; |
590 | |
591 | ns->replica_maps = cf_malloc(size); |
592 | memset(ns->replica_maps, 0, size); |
593 | |
594 | for (uint32_t repl_ix = 0; repl_ix < ns->cfg_replication_factor; |
595 | repl_ix++) { |
596 | client_replica_map* repl_map = &ns->replica_maps[repl_ix]; |
597 | |
598 | cf_mutex_init(&repl_map->write_lock); |
599 | |
600 | cf_b64_encode((uint8_t*)repl_map->bitmap, |
601 | (uint32_t)sizeof(repl_map->bitmap), (char*)repl_map->b64map); |
602 | } |
603 | } |
604 | |
605 | void |
606 | client_replica_maps_clear(as_namespace* ns) |
607 | { |
608 | memset(ns->replica_maps, 0, |
609 | sizeof(client_replica_map) * ns->cfg_replication_factor); |
610 | |
611 | for (uint32_t repl_ix = 0; repl_ix < ns->cfg_replication_factor; |
612 | repl_ix++) { |
613 | client_replica_map* repl_map = &ns->replica_maps[repl_ix]; |
614 | |
615 | cf_b64_encode((uint8_t*)repl_map->bitmap, |
616 | (uint32_t)sizeof(repl_map->bitmap), (char*)repl_map->b64map); |
617 | } |
618 | } |
619 | |
620 | bool |
621 | client_replica_maps_update(as_namespace* ns, uint32_t pid) |
622 | { |
623 | uint32_t byte_i = pid >> 3; |
624 | uint32_t byte_chunk = (byte_i / 3); |
625 | uint32_t chunk_bitmap_offset = byte_chunk * 3; |
626 | uint32_t chunk_b64map_offset = byte_chunk << 2; |
627 | |
628 | uint32_t bytes_from_end = CLIENT_BITMAP_BYTES - chunk_bitmap_offset; |
629 | uint32_t input_size = bytes_from_end > 3 ? 3 : bytes_from_end; |
630 | |
631 | int replica = partition_get_replica_self_lockfree(ns, pid); // -1 if not |
632 | uint8_t set_mask = 0x80 >> (pid & 0x7); |
633 | bool changed = false; |
634 | |
635 | for (int repl_ix = 0; repl_ix < (int)ns->cfg_replication_factor; |
636 | repl_ix++) { |
637 | client_replica_map* repl_map = &ns->replica_maps[repl_ix]; |
638 | |
639 | volatile uint8_t* mbyte = repl_map->bitmap + byte_i; |
640 | |
641 | bool owned = replica == repl_ix || |
642 | // Working master also owns all immigrating prole columns, and |
643 | // if it's an acting master in a prole column, that column (e.g. |
644 | // full, and migrating to newly added master). |
645 | (replica == 0 && // am working master |
646 | should_working_master_own(ns, pid, repl_ix)); |
647 | |
648 | bool is_set = (*mbyte & set_mask) != 0; |
649 | bool needs_update = (owned && ! is_set) || (! owned && is_set); |
650 | |
651 | if (! needs_update) { |
652 | continue; |
653 | } |
654 | |
655 | volatile uint8_t* bitmap_chunk = repl_map->bitmap + chunk_bitmap_offset; |
656 | volatile char* b64map_chunk = repl_map->b64map + chunk_b64map_offset; |
657 | |
658 | cf_mutex_lock(&repl_map->write_lock); |
659 | |
660 | *mbyte ^= set_mask; |
661 | cf_b64_encode((uint8_t*)bitmap_chunk, input_size, (char*)b64map_chunk); |
662 | |
663 | cf_mutex_unlock(&repl_map->write_lock); |
664 | |
665 | changed = true; |
666 | } |
667 | |
668 | return changed; |
669 | } |
670 | |
671 | bool |
672 | client_replica_maps_is_partition_queryable(const as_namespace* ns, uint32_t pid) |
673 | { |
674 | uint32_t byte_i = pid >> 3; |
675 | |
676 | const client_replica_map* repl_map = ns->replica_maps; |
677 | const volatile uint8_t* mbyte = repl_map->bitmap + byte_i; |
678 | |
679 | uint8_t set_mask = 0x80 >> (pid & 0x7); |
680 | |
681 | return (*mbyte & set_mask) != 0; |
682 | } |
683 | |
684 | |
685 | //========================================================== |
686 | // Local helpers. |
687 | // |
688 | |
689 | // Find best node to handle read/write. Called within partition lock. |
690 | cf_node |
691 | find_best_node(const as_partition* p, bool is_read) |
692 | { |
693 | // Working master (final or acting) returns self, eventual master returns |
694 | // acting master. Others don't have p->working_master set. |
695 | if (p->working_master != (cf_node)0) { |
696 | return p->working_master; |
697 | } |
698 | |
699 | if (is_read && p->pending_immigrations == 0 && |
700 | find_self_in_replicas(p) > 0) { |
701 | return g_config.self_node; // may read from prole that's got everything |
702 | } |
703 | |
704 | return p->replicas[0]; // final master as a last resort |
705 | } |
706 | |
707 | void |
708 | accumulate_replica_stats(const as_partition* p, uint64_t* p_n_objects, |
709 | uint64_t* p_n_tombstones) |
710 | { |
711 | int64_t n_tombstones = (int64_t)p->n_tombstones; |
712 | int64_t n_objects = (int64_t)as_index_tree_size(p->tree) - n_tombstones; |
713 | |
714 | *p_n_objects += n_objects > 0 ? (uint64_t)n_objects : 0; |
715 | *p_n_tombstones += (uint64_t)n_tombstones; |
716 | } |
717 | |
718 | void |
719 | partition_reserve_lockfree(as_partition* p, as_namespace* ns, |
720 | as_partition_reservation* rsv) |
721 | { |
722 | as_index_tree_reserve(p->tree); |
723 | |
724 | rsv->ns = ns; |
725 | rsv->p = p; |
726 | rsv->tree = p->tree; |
727 | rsv->regime = p->regime; |
728 | rsv->n_dupl = p->n_dupl; |
729 | |
730 | if (rsv->n_dupl != 0) { |
731 | memcpy(rsv->dupl_nodes, p->dupls, sizeof(cf_node) * rsv->n_dupl); |
732 | } |
733 | } |
734 | |
735 | char |
736 | partition_descriptor(const as_partition* p) |
737 | { |
738 | if (find_self_in_replicas(p) >= 0) { // -1 if not |
739 | return p->pending_immigrations == 0 ? 'S' : 'D'; |
740 | } |
741 | |
742 | if (as_partition_version_is_null(&p->version)) { |
743 | return 'A'; |
744 | } |
745 | |
746 | return as_partition_version_has_data(&p->version) ? 'Z' : 'X'; |
747 | } |
748 | |
749 | int |
750 | partition_get_replica_self_lockfree(const as_namespace* ns, uint32_t pid) |
751 | { |
752 | const as_partition* p = &ns->partitions[pid]; |
753 | |
754 | if (g_config.self_node == p->working_master) { |
755 | return 0; |
756 | } |
757 | |
758 | int self_n = find_self_in_replicas(p); // -1 if not |
759 | |
760 | if (self_n > 0 && p->pending_immigrations == 0 && |
761 | // Check self_n < n_repl only because n_repl could be out-of-sync |
762 | // with (less than) partition's replica list count. |
763 | self_n < (int)ns->replication_factor) { |
764 | return self_n; |
765 | } |
766 | |
767 | return -1; // not a replica |
768 | } |
769 | |
770 | bool |
771 | should_working_master_own(const as_namespace* ns, uint32_t pid, |
772 | uint32_t repl_ix) |
773 | { |
774 | const as_partition* p = &ns->partitions[pid]; |
775 | |
776 | return find_self_in_replicas(p) == (int)repl_ix || p->immigrators[repl_ix]; |
777 | } |
778 | |