1 | /* |
2 | * partition_balance.c |
3 | * |
4 | * Copyright (C) 2016-2019 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "fabric/partition_balance.h" |
28 | |
29 | #include <stdbool.h> |
30 | #include <stddef.h> |
31 | #include <stdint.h> |
32 | #include <stdlib.h> |
33 | #include <string.h> |
34 | |
35 | #include "citrusleaf/alloc.h" |
36 | #include "citrusleaf/cf_atomic.h" |
37 | #include "citrusleaf/cf_hash_math.h" |
38 | #include "citrusleaf/cf_queue.h" |
39 | |
40 | #include "cf_mutex.h" |
41 | #include "fault.h" |
42 | #include "node.h" |
43 | |
44 | #include "base/cfg.h" |
45 | #include "base/datamodel.h" |
46 | #include "base/index.h" |
47 | #include "fabric/exchange.h" |
48 | #include "fabric/hb.h" |
49 | #include "fabric/migrate.h" |
50 | #include "fabric/partition.h" |
51 | #include "storage/storage.h" |
52 | |
53 | |
54 | //========================================================== |
55 | // Typedefs & constants. |
56 | // |
57 | |
58 | const as_partition_version ZERO_VERSION = { 0 }; |
59 | |
60 | |
61 | //========================================================== |
62 | // Globals. |
63 | // |
64 | |
65 | cf_atomic32 g_partition_generation = (uint32_t)-1; |
66 | uint64_t g_rebalance_sec; |
67 | uint64_t g_rebalance_generation = 0; |
68 | |
69 | // Using int for 4-byte size, but maintaining bool semantics. |
70 | // TODO - ok as non-volatile, but should selectively load/store in the future. |
71 | static int g_init_balance_done = false; |
72 | |
73 | static cf_atomic32 g_migrate_num_incoming = 0; |
74 | |
75 | // Using int for 4-byte size, but maintaining bool semantics. |
76 | volatile int g_allow_migrations = false; |
77 | |
78 | uint64_t g_hashed_pids[AS_PARTITIONS]; |
79 | |
80 | // Shortcuts to values set by as_exchange, for use in partition balance only. |
81 | uint32_t g_cluster_size = 0; |
82 | cf_node* g_succession = NULL; |
83 | |
84 | cf_node g_full_node_seq_table[AS_CLUSTER_SZ * AS_PARTITIONS]; |
85 | sl_ix_t g_full_sl_ix_table[AS_CLUSTER_SZ * AS_PARTITIONS]; |
86 | |
87 | |
88 | //========================================================== |
89 | // Forward declarations. |
90 | // |
91 | |
92 | // Only partition_balance hooks into exchange. |
93 | extern cf_node* as_exchange_succession_unsafe(); |
94 | |
95 | // Helpers - generic. |
96 | void create_trees(as_partition* p, as_namespace* ns); |
97 | void drop_trees(as_partition* p); |
98 | |
99 | // Helpers - balance partitions. |
100 | void fill_global_tables(); |
101 | void apply_single_replica_limit_ap(as_namespace* ns); |
102 | int find_working_master_ap(const as_partition* p, const sl_ix_t* ns_sl_ix, const as_namespace* ns); |
103 | uint32_t find_duplicates_ap(const as_partition* p, const cf_node* ns_node_seq, const sl_ix_t* ns_sl_ix, const struct as_namespace_s* ns, uint32_t working_master_n, cf_node dupls[]); |
104 | void advance_version_ap(as_partition* p, const sl_ix_t* ns_sl_ix, as_namespace* ns, uint32_t self_n, uint32_t working_master_n, uint32_t n_dupl, const cf_node dupls[]); |
105 | uint32_t fill_family_versions(const as_partition* p, const sl_ix_t* ns_sl_ix, const as_namespace* ns, uint32_t working_master_n, uint32_t n_dupl, const cf_node dupls[], as_partition_version family_versions[]); |
106 | bool has_replica_parent(const as_partition* p, const sl_ix_t* ns_sl_ix, const as_namespace* ns, const as_partition_version* subset_version, uint32_t subset_n); |
107 | uint32_t find_family(const as_partition_version* self_version, uint32_t n_families, const as_partition_version family_versions[]); |
108 | |
109 | // Helpers - migration-related. |
110 | bool partition_immigration_is_valid(const as_partition* p, cf_node source_node, const as_namespace* ns, const char* tag); |
111 | |
112 | |
113 | //========================================================== |
114 | // Inlines & macros. |
115 | // |
116 | |
117 | static inline bool |
118 | is_self_final_master(const as_partition* p) |
119 | { |
120 | return p->replicas[0] == g_config.self_node; |
121 | } |
122 | |
123 | static inline bool |
124 | is_family_same(const as_partition_version* v1, const as_partition_version* v2) |
125 | { |
126 | return v1->ckey == v2->ckey && v1->family == v2->family && |
127 | v1->family != VERSION_FAMILY_UNIQUE; |
128 | } |
129 | |
130 | |
131 | //========================================================== |
132 | // Public API - regulate migrations. |
133 | // |
134 | |
135 | void |
136 | as_partition_balance_disallow_migrations() |
137 | { |
138 | cf_detail(AS_PARTITION, "disallow migrations" ); |
139 | |
140 | g_allow_migrations = false; |
141 | } |
142 | |
143 | bool |
144 | as_partition_balance_are_migrations_allowed() |
145 | { |
146 | return g_allow_migrations; |
147 | } |
148 | |
149 | void |
150 | as_partition_balance_synchronize_migrations() |
151 | { |
152 | // Acquire and release each partition lock to ensure threads acquiring a |
153 | // partition lock after this will be forced to check the latest cluster key. |
154 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
155 | as_namespace* ns = g_config.namespaces[ns_ix]; |
156 | |
157 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
158 | as_partition* p = &ns->partitions[pid]; |
159 | |
160 | cf_mutex_lock(&p->lock); |
161 | cf_mutex_unlock(&p->lock); |
162 | } |
163 | } |
164 | |
165 | // Prior-round migrations won't decrement g_migrate_num_incoming due to |
166 | // cluster key check. |
167 | cf_atomic32_set(&g_migrate_num_incoming, 0); |
168 | } |
169 | |
170 | |
171 | //========================================================== |
172 | // Public API - balance partitions. |
173 | // |
174 | |
175 | void |
176 | as_partition_balance_init() |
177 | { |
178 | // Cache hashed pids for all future rebalances. |
179 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
180 | g_hashed_pids[pid] = cf_hash_fnv64((const uint8_t*)&pid, |
181 | sizeof(uint32_t)); |
182 | } |
183 | |
184 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
185 | as_namespace* ns = g_config.namespaces[ns_ix]; |
186 | |
187 | uint32_t n_stored = 0; |
188 | |
189 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
190 | as_partition* p = &ns->partitions[pid]; |
191 | |
192 | as_storage_load_pmeta(ns, p); |
193 | |
194 | if (as_partition_version_has_data(&p->version)) { |
195 | as_partition_isolate_version(ns, p); |
196 | n_stored++; |
197 | } |
198 | } |
199 | |
200 | cf_info(AS_PARTITION, "{%s} %u partitions: found %u absent, %u stored" , |
201 | ns->name, AS_PARTITIONS, AS_PARTITIONS - n_stored, n_stored); |
202 | } |
203 | |
204 | partition_balance_init(); |
205 | } |
206 | |
207 | // Has the node resolved as operating either in a multi-node cluster or as a |
208 | // single-node cluster? |
209 | bool |
210 | as_partition_balance_is_init_resolved() |
211 | { |
212 | return g_init_balance_done; |
213 | } |
214 | |
215 | void |
216 | as_partition_balance_revert_to_orphan() |
217 | { |
218 | g_init_balance_done = false; |
219 | g_allow_migrations = false; |
220 | |
221 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
222 | as_namespace* ns = g_config.namespaces[ns_ix]; |
223 | |
224 | client_replica_maps_clear(ns); |
225 | |
226 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
227 | as_partition* p = &ns->partitions[pid]; |
228 | |
229 | cf_mutex_lock(&p->lock); |
230 | |
231 | as_partition_freeze(p); |
232 | as_partition_isolate_version(ns, p); |
233 | |
234 | cf_mutex_unlock(&p->lock); |
235 | } |
236 | |
237 | ns->n_unavailable_partitions = AS_PARTITIONS; |
238 | } |
239 | |
240 | cf_atomic32_incr(&g_partition_generation); |
241 | } |
242 | |
243 | void |
244 | as_partition_balance() |
245 | { |
246 | // Temporary paranoia. |
247 | static uint64_t last_cluster_key = 0; |
248 | |
249 | if (last_cluster_key == as_exchange_cluster_key()) { |
250 | cf_warning(AS_PARTITION, "as_partition_balance: cluster key %lx same as last time" , |
251 | last_cluster_key); |
252 | return; |
253 | } |
254 | |
255 | last_cluster_key = as_exchange_cluster_key(); |
256 | // End - temporary paranoia. |
257 | |
258 | // These shortcuts must only be used within the scope of this function. |
259 | g_cluster_size = as_exchange_cluster_size(); |
260 | g_succession = as_exchange_succession_unsafe(); |
261 | |
262 | // Each partition separately shuffles the node succession list to generate |
263 | // its own node sequence. |
264 | fill_global_tables(); |
265 | |
266 | cf_queue mq; |
267 | |
268 | cf_queue_init(&mq, sizeof(pb_task), g_config.n_namespaces * AS_PARTITIONS, |
269 | false); |
270 | |
271 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
272 | balance_namespace(g_config.namespaces[ns_ix], &mq); |
273 | } |
274 | |
275 | prepare_for_appeals(); |
276 | |
277 | // All partitions now have replicas assigned, ok to allow transactions. |
278 | g_init_balance_done = true; |
279 | cf_atomic32_incr(&g_partition_generation); |
280 | |
281 | g_allow_migrations = true; |
282 | cf_detail(AS_PARTITION, "allow migrations" ); |
283 | |
284 | g_rebalance_sec = cf_get_seconds(); // must precede process_pb_tasks() |
285 | |
286 | process_pb_tasks(&mq); |
287 | cf_queue_destroy(&mq); |
288 | |
289 | g_rebalance_generation++; |
290 | } |
291 | |
292 | uint64_t |
293 | as_partition_balance_remaining_migrations() |
294 | { |
295 | uint64_t remaining_migrations = 0; |
296 | |
297 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
298 | as_namespace* ns = g_config.namespaces[ns_ix]; |
299 | |
300 | remaining_migrations += ns->migrate_tx_partitions_remaining; |
301 | remaining_migrations += ns->migrate_rx_partitions_remaining; |
302 | } |
303 | |
304 | return remaining_migrations; |
305 | } |
306 | |
307 | |
308 | //========================================================== |
309 | // Public API - migration-related as_partition methods. |
310 | // |
311 | |
312 | // Currently used only for enterprise build. |
313 | bool |
314 | as_partition_pending_migrations(as_partition* p) |
315 | { |
316 | cf_mutex_lock(&p->lock); |
317 | |
318 | bool pending = p->pending_immigrations + p->pending_emigrations != 0; |
319 | |
320 | cf_mutex_unlock(&p->lock); |
321 | |
322 | return pending; |
323 | } |
324 | |
325 | void |
326 | as_partition_emigrate_done(as_namespace* ns, uint32_t pid, |
327 | uint64_t orig_cluster_key, cf_node dest_node, uint32_t tx_flags) |
328 | { |
329 | as_partition* p = &ns->partitions[pid]; |
330 | |
331 | cf_mutex_lock(&p->lock); |
332 | |
333 | if (! g_allow_migrations || orig_cluster_key != as_exchange_cluster_key()) { |
334 | cf_debug(AS_PARTITION, "{%s:%u} emigrate_done - cluster key mismatch" , |
335 | ns->name, pid); |
336 | cf_mutex_unlock(&p->lock); |
337 | return; |
338 | } |
339 | |
340 | if (p->pending_emigrations == 0) { |
341 | cf_warning(AS_PARTITION, "{%s:%u} emigrate_done - no pending emigrations" , |
342 | ns->name, pid); |
343 | cf_mutex_unlock(&p->lock); |
344 | return; |
345 | } |
346 | |
347 | p->pending_emigrations--; |
348 | |
349 | int64_t migrates_tx_remaining = |
350 | cf_atomic_int_decr(&ns->migrate_tx_partitions_remaining); |
351 | |
352 | if (migrates_tx_remaining < 0){ |
353 | cf_warning(AS_PARTITION, "{%s:%u} (%hu,%ld) emigrate_done - counter went negative" , |
354 | ns->name, pid, p->pending_emigrations, migrates_tx_remaining); |
355 | } |
356 | |
357 | if ((tx_flags & TX_FLAGS_LEAD) != 0) { |
358 | p->pending_lead_emigrations--; |
359 | cf_atomic_int_decr(&ns->migrate_tx_partitions_lead_remaining); |
360 | } |
361 | |
362 | if (! is_self_final_master(p)) { |
363 | emigrate_done_advance_non_master_version(ns, p, tx_flags); |
364 | } |
365 | |
366 | int dest_ix = index_of_node(p->replicas, p->n_replicas, dest_node); |
367 | |
368 | cf_assert(dest_ix != -1, AS_PARTITION, "non-replica dest node" ); |
369 | |
370 | p->immigrators[dest_ix] = false; |
371 | |
372 | if (client_replica_maps_update(ns, pid)) { |
373 | cf_atomic32_incr(&g_partition_generation); |
374 | } |
375 | |
376 | cf_queue mq; |
377 | pb_task task; |
378 | int w_ix = -1; |
379 | |
380 | if (is_self_final_master(p) && |
381 | p->pending_emigrations == 0 && p->pending_immigrations == 0) { |
382 | cf_queue_init(&mq, sizeof(pb_task), p->n_witnesses, false); |
383 | |
384 | for (w_ix = 0; w_ix < (int)p->n_witnesses; w_ix++) { |
385 | pb_task_init(&task, p->witnesses[w_ix], ns, pid, orig_cluster_key, |
386 | PB_TASK_EMIG_SIGNAL_ALL_DONE, TX_FLAGS_CONTINGENT); |
387 | cf_queue_push(&mq, &task); |
388 | } |
389 | } |
390 | |
391 | cf_mutex_unlock(&p->lock); |
392 | |
393 | if (w_ix >= 0) { |
394 | while (cf_queue_pop(&mq, &task, CF_QUEUE_NOWAIT) == CF_QUEUE_OK) { |
395 | as_migrate_emigrate(&task); |
396 | } |
397 | |
398 | cf_queue_destroy(&mq); |
399 | } |
400 | } |
401 | |
402 | as_migrate_result |
403 | as_partition_immigrate_start(as_namespace* ns, uint32_t pid, |
404 | uint64_t orig_cluster_key, cf_node source_node) |
405 | { |
406 | as_partition* p = &ns->partitions[pid]; |
407 | |
408 | cf_mutex_lock(&p->lock); |
409 | |
410 | if (! g_allow_migrations || orig_cluster_key != as_exchange_cluster_key() || |
411 | immigrate_yield()) { |
412 | cf_debug(AS_PARTITION, "{%s:%u} immigrate_start - cluster key mismatch" , |
413 | ns->name, pid); |
414 | cf_mutex_unlock(&p->lock); |
415 | return AS_MIGRATE_AGAIN; |
416 | } |
417 | |
418 | uint32_t num_incoming = (uint32_t)cf_atomic32_incr(&g_migrate_num_incoming); |
419 | |
420 | if (num_incoming > g_config.migrate_max_num_incoming) { |
421 | cf_debug(AS_PARTITION, "{%s:%u} immigrate_start - exceeded max_num_incoming" , |
422 | ns->name, pid); |
423 | cf_atomic32_decr(&g_migrate_num_incoming); |
424 | cf_mutex_unlock(&p->lock); |
425 | return AS_MIGRATE_AGAIN; |
426 | } |
427 | |
428 | if (! partition_immigration_is_valid(p, source_node, ns, "start" )) { |
429 | cf_atomic32_decr(&g_migrate_num_incoming); |
430 | cf_mutex_unlock(&p->lock); |
431 | return AS_MIGRATE_FAIL; |
432 | } |
433 | |
434 | if (! is_self_final_master(p)) { |
435 | immigrate_start_advance_non_master_version(ns, p); |
436 | as_storage_save_pmeta(ns, p); |
437 | } |
438 | |
439 | cf_mutex_unlock(&p->lock); |
440 | |
441 | return AS_MIGRATE_OK; |
442 | } |
443 | |
444 | as_migrate_result |
445 | as_partition_immigrate_done(as_namespace* ns, uint32_t pid, |
446 | uint64_t orig_cluster_key, cf_node source_node) |
447 | { |
448 | as_partition* p = &ns->partitions[pid]; |
449 | |
450 | cf_mutex_lock(&p->lock); |
451 | |
452 | if (! g_allow_migrations || orig_cluster_key != as_exchange_cluster_key()) { |
453 | cf_debug(AS_PARTITION, "{%s:%u} immigrate_done - cluster key mismatch" , |
454 | ns->name, pid); |
455 | cf_mutex_unlock(&p->lock); |
456 | return AS_MIGRATE_FAIL; |
457 | } |
458 | |
459 | cf_atomic32_decr(&g_migrate_num_incoming); |
460 | |
461 | if (! partition_immigration_is_valid(p, source_node, ns, "done" )) { |
462 | cf_mutex_unlock(&p->lock); |
463 | return AS_MIGRATE_FAIL; |
464 | } |
465 | |
466 | p->pending_immigrations--; |
467 | |
468 | int64_t migrates_rx_remaining = |
469 | cf_atomic_int_decr(&ns->migrate_rx_partitions_remaining); |
470 | |
471 | // Sanity-check only. |
472 | if (migrates_rx_remaining < 0) { |
473 | cf_warning(AS_PARTITION, "{%s:%u} (%hu,%ld) immigrate_done - counter went negative" , |
474 | ns->name, pid, p->pending_immigrations, migrates_rx_remaining); |
475 | } |
476 | |
477 | if (p->pending_immigrations == 0 && |
478 | ! as_partition_version_same(&p->version, &p->final_version)) { |
479 | p->version = p->final_version; |
480 | as_storage_save_pmeta(ns, p); |
481 | } |
482 | |
483 | if (! is_self_final_master(p)) { |
484 | if (client_replica_maps_update(ns, pid)) { |
485 | cf_atomic32_incr(&g_partition_generation); |
486 | } |
487 | |
488 | cf_mutex_unlock(&p->lock); |
489 | return AS_MIGRATE_OK; |
490 | } |
491 | |
492 | // Final master finished an immigration, adjust duplicates. |
493 | |
494 | if (source_node == p->working_master) { |
495 | p->working_master = g_config.self_node; |
496 | |
497 | immigrate_done_advance_final_master_version(ns, p); |
498 | } |
499 | else { |
500 | p->n_dupl = remove_node(p->dupls, p->n_dupl, source_node); |
501 | } |
502 | |
503 | if (client_replica_maps_update(ns, pid)) { |
504 | cf_atomic32_incr(&g_partition_generation); |
505 | } |
506 | |
507 | if (p->pending_immigrations != 0) { |
508 | cf_mutex_unlock(&p->lock); |
509 | return AS_MIGRATE_OK; |
510 | } |
511 | |
512 | // Final master finished all immigration. |
513 | |
514 | cf_queue mq; |
515 | pb_task task; |
516 | |
517 | if (p->pending_emigrations != 0) { |
518 | cf_queue_init(&mq, sizeof(pb_task), p->n_replicas - 1, false); |
519 | |
520 | for (uint32_t repl_ix = 1; repl_ix < p->n_replicas; repl_ix++) { |
521 | if (p->immigrators[repl_ix]) { |
522 | pb_task_init(&task, p->replicas[repl_ix], ns, pid, |
523 | orig_cluster_key, PB_TASK_EMIG_TRANSFER, |
524 | TX_FLAGS_CONTINGENT); |
525 | cf_queue_push(&mq, &task); |
526 | } |
527 | } |
528 | } |
529 | else { |
530 | cf_queue_init(&mq, sizeof(pb_task), p->n_witnesses, false); |
531 | |
532 | for (uint16_t w_ix = 0; w_ix < p->n_witnesses; w_ix++) { |
533 | pb_task_init(&task, p->witnesses[w_ix], ns, pid, orig_cluster_key, |
534 | PB_TASK_EMIG_SIGNAL_ALL_DONE, TX_FLAGS_CONTINGENT); |
535 | cf_queue_push(&mq, &task); |
536 | } |
537 | } |
538 | |
539 | cf_mutex_unlock(&p->lock); |
540 | |
541 | while (cf_queue_pop(&mq, &task, 0) == CF_QUEUE_OK) { |
542 | as_migrate_emigrate(&task); |
543 | } |
544 | |
545 | cf_queue_destroy(&mq); |
546 | |
547 | return AS_MIGRATE_OK; |
548 | } |
549 | |
550 | as_migrate_result |
551 | as_partition_migrations_all_done(as_namespace* ns, uint32_t pid, |
552 | uint64_t orig_cluster_key) |
553 | { |
554 | as_partition* p = &ns->partitions[pid]; |
555 | |
556 | cf_mutex_lock(&p->lock); |
557 | |
558 | if (! g_allow_migrations || orig_cluster_key != as_exchange_cluster_key()) { |
559 | cf_debug(AS_PARTITION, "{%s:%u} all_done - cluster key mismatch" , |
560 | ns->name, pid); |
561 | cf_mutex_unlock(&p->lock); |
562 | return AS_MIGRATE_FAIL; |
563 | } |
564 | |
565 | if (p->pending_emigrations != 0) { |
566 | cf_debug(AS_PARTITION, "{%s:%u} all_done - eagain" , |
567 | ns->name, pid); |
568 | cf_mutex_unlock(&p->lock); |
569 | return AS_MIGRATE_AGAIN; |
570 | } |
571 | |
572 | // Not a replica and non-null version ... |
573 | if (! is_self_replica(p) && ! as_partition_version_is_null(&p->version)) { |
574 | // ... and not quiesced - drop partition. |
575 | if (drop_superfluous_version(p, ns)) { |
576 | drop_trees(p); |
577 | as_storage_save_pmeta(ns, p); |
578 | } |
579 | // ... or quiesced more than one node - become subset of final version. |
580 | else if (adjust_superfluous_version(p, ns)) { |
581 | as_storage_save_pmeta(ns, p); |
582 | } |
583 | } |
584 | |
585 | cf_mutex_unlock(&p->lock); |
586 | |
587 | return AS_MIGRATE_OK; |
588 | } |
589 | |
590 | void |
591 | as_partition_signal_done(as_namespace* ns, uint32_t pid, |
592 | uint64_t orig_cluster_key) |
593 | { |
594 | as_partition* p = &ns->partitions[pid]; |
595 | |
596 | cf_mutex_lock(&p->lock); |
597 | |
598 | if (! g_allow_migrations || orig_cluster_key != as_exchange_cluster_key()) { |
599 | cf_debug(AS_PARTITION, "{%s:%u} signal_done - cluster key mismatch" , |
600 | ns->name, pid); |
601 | cf_mutex_unlock(&p->lock); |
602 | return; |
603 | } |
604 | |
605 | cf_atomic_int_decr(&ns->migrate_signals_remaining); |
606 | |
607 | cf_mutex_unlock(&p->lock); |
608 | } |
609 | |
610 | |
611 | //========================================================== |
612 | // Local helpers - generic. |
613 | // |
614 | |
615 | void |
616 | pb_task_init(pb_task* task, cf_node dest, as_namespace* ns, |
617 | uint32_t pid, uint64_t cluster_key, pb_task_type type, |
618 | uint32_t tx_flags) |
619 | { |
620 | task->dest = dest; |
621 | task->ns = ns; |
622 | task->pid = pid; |
623 | task->type = type; |
624 | task->tx_flags = tx_flags; |
625 | task->cluster_key = cluster_key; |
626 | } |
627 | |
628 | void |
629 | create_trees(as_partition* p, as_namespace* ns) |
630 | { |
631 | cf_assert(! p->tree, AS_PARTITION, "unexpected - tree already exists" ); |
632 | |
633 | as_partition_advance_tree_id(p, ns->name); |
634 | |
635 | p->tree = as_index_tree_create(&ns->tree_shared, p->tree_id, |
636 | as_partition_tree_done, (void*)p); |
637 | } |
638 | |
639 | void |
640 | drop_trees(as_partition* p) |
641 | { |
642 | if (! p->tree) { |
643 | return; // CP signals can get here - 0e/0r versions are witnesses |
644 | } |
645 | |
646 | as_index_tree_release(p->tree); |
647 | p->tree = NULL; |
648 | |
649 | // TODO - consider p->n_tombstones? |
650 | cf_atomic32_set(&p->max_void_time, 0); |
651 | } |
652 | |
653 | |
654 | //========================================================== |
655 | // Local helpers - balance partitions. |
656 | // |
657 | |
658 | // Succession list - all nodes in cluster |
659 | // +---------------+ |
660 | // | A | B | C | D | |
661 | // +---------------+ |
662 | // |
663 | // Succession list index (sl_ix) - used as version table and rack-id index |
664 | // +---------------+ |
665 | // | 0 | 1 | 2 | 3 | |
666 | // +---------------+ |
667 | // |
668 | // Every partition shuffles the succession list independently, e.g. for pid 0: |
669 | // Hash the node names with the pid: |
670 | // H(A,0) = Y, H(B,0) = X, H(C,0) = W, H(D,0) = Z |
671 | // Store sl_ix in last byte of hash results so it doesn't affect sort: |
672 | // +-----------------------+ |
673 | // | Y_0 | X_1 | W_2 | Z_3 | |
674 | // +-----------------------+ |
675 | // This sorts to: |
676 | // +-----------------------+ |
677 | // | W_2 | X_1 | Y_0 | Z_3 | |
678 | // +-----------------------+ |
679 | // Replace original node names, and keep sl_ix order, resulting in: |
680 | // +---------------+ +---------------+ |
681 | // | C | B | A | D | | 2 | 1 | 0 | 3 | |
682 | // +---------------+ +---------------+ |
683 | // |
684 | // Node sequence table Succession list index table |
685 | // pid pid |
686 | // +===+---------------+ +===+---------------+ |
687 | // | 0 | C | B | A | D | | 0 | 2 | 1 | 0 | 3 | |
688 | // +===+---------------+ +===+---------------+ |
689 | // | 1 | A | D | C | B | | 1 | 0 | 3 | 2 | 1 | |
690 | // +===+---------------+ +===+---------------+ |
691 | // | 2 | D | C | B | A | | 2 | 3 | 2 | 1 | 0 | |
692 | // +===+---------------+ +===+---------------+ |
693 | // | 3 | B | A | D | C | | 3 | 1 | 0 | 3 | 2 | |
694 | // +===+---------------+ +===+---------------+ |
695 | // | 4 | D | B | C | A | | 4 | 3 | 1 | 2 | 0 | |
696 | // +===+---------------+ +===+---------------+ |
697 | // ... to pid 4095. |
698 | // |
699 | // We keep the succession list index table so we can refer back to namespaces' |
700 | // partition version tables and rack-id lists, where nodes are in the original |
701 | // succession list order. |
702 | void |
703 | fill_global_tables() |
704 | { |
705 | uint64_t hashed_nodes[g_cluster_size]; |
706 | |
707 | for (uint32_t n = 0; n < g_cluster_size; n++) { |
708 | hashed_nodes[n] = cf_hash_fnv64((const uint8_t*)&g_succession[n], |
709 | sizeof(cf_node)); |
710 | } |
711 | |
712 | // Build the node sequence table. |
713 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
714 | inter_hash h; |
715 | |
716 | h.hashed_pid = g_hashed_pids[pid]; |
717 | |
718 | for (uint32_t n = 0; n < g_cluster_size; n++) { |
719 | h.hashed_node = hashed_nodes[n]; |
720 | |
721 | cf_node* node_p = &FULL_NODE_SEQ(pid, n); |
722 | |
723 | *node_p = cf_hash_jen64((const uint8_t*)&h, sizeof(h)); |
724 | |
725 | // Overlay index onto last byte. |
726 | *node_p &= AS_CLUSTER_SZ_MASKP; |
727 | *node_p += n; |
728 | } |
729 | |
730 | // Sort the hashed node values. |
731 | qsort(&FULL_NODE_SEQ(pid, 0), g_cluster_size, sizeof(cf_node), |
732 | cf_node_compare_desc); |
733 | |
734 | // Overwrite the sorted hash values with the original node IDs. |
735 | for (uint32_t n = 0; n < g_cluster_size; n++) { |
736 | cf_node* node_p = &FULL_NODE_SEQ(pid, n); |
737 | sl_ix_t sl_ix = (sl_ix_t)(*node_p & AS_CLUSTER_SZ_MASKN); |
738 | |
739 | *node_p = g_succession[sl_ix]; |
740 | |
741 | // Saved to refer back to partition version table and rack-id list. |
742 | FULL_SL_IX(pid, n) = sl_ix; |
743 | } |
744 | } |
745 | } |
746 | |
747 | void |
748 | balance_namespace_ap(as_namespace* ns, cf_queue* mq) |
749 | { |
750 | bool ns_less_than_global = ns->cluster_size != g_cluster_size; |
751 | |
752 | if (ns_less_than_global) { |
753 | cf_info(AS_PARTITION, "{%s} is on %u of %u nodes" , ns->name, |
754 | ns->cluster_size, g_cluster_size); |
755 | } |
756 | |
757 | // Figure out effective replication factor in the face of node failures. |
758 | apply_single_replica_limit_ap(ns); |
759 | |
760 | // Active size will be less than cluster size if nodes are quiesced. |
761 | set_active_size(ns); |
762 | |
763 | uint32_t n_racks = rack_count(ns); |
764 | |
765 | // If a namespace is not on all nodes or is rack aware or uniform balance |
766 | // is preferred or nodes are quiesced, it can't use the global node sequence |
767 | // and index tables. |
768 | bool ns_not_equal_global = ns_less_than_global || n_racks != 1 || |
769 | ns->prefer_uniform_balance || ns->active_size != ns->cluster_size; |
770 | |
771 | // The translation array is used to convert global table rows to namespace |
772 | // rows, if necessary. |
773 | int translation[ns_less_than_global ? g_cluster_size : 0]; |
774 | |
775 | if (ns_less_than_global) { |
776 | fill_translation(translation, ns); |
777 | } |
778 | |
779 | uint32_t claims_size = ns->prefer_uniform_balance ? |
780 | ns->replication_factor * g_cluster_size : 0; |
781 | uint32_t claims[claims_size]; |
782 | uint32_t target_claims[claims_size]; |
783 | |
784 | if (ns->prefer_uniform_balance) { |
785 | memset(claims, 0, sizeof(claims)); |
786 | init_target_claims_ap(ns, translation, target_claims); |
787 | } |
788 | |
789 | uint32_t ns_pending_emigrations = 0; |
790 | uint32_t ns_pending_lead_emigrations = 0; |
791 | uint32_t ns_pending_immigrations = 0; |
792 | uint32_t ns_pending_signals = 0; |
793 | |
794 | uint32_t ns_fresh_partitions = 0; |
795 | |
796 | for (uint32_t pid_group = 0; pid_group < NUM_PID_GROUPS; pid_group++) { |
797 | uint32_t start_pid = pid_group * PIDS_PER_GROUP; |
798 | uint32_t end_pid = start_pid + PIDS_PER_GROUP; |
799 | |
800 | for (uint32_t pid = start_pid; pid < end_pid; pid++) { |
801 | as_partition* p = &ns->partitions[pid]; |
802 | |
803 | cf_node* full_node_seq = &FULL_NODE_SEQ(pid, 0); |
804 | sl_ix_t* full_sl_ix = &FULL_SL_IX(pid, 0); |
805 | |
806 | // Usually a namespace can simply use the global tables... |
807 | cf_node* ns_node_seq = full_node_seq; |
808 | sl_ix_t* ns_sl_ix = full_sl_ix; |
809 | |
810 | cf_node stack_node_seq[ns_not_equal_global ? ns->cluster_size : 0]; |
811 | sl_ix_t stack_sl_ix[ns_not_equal_global ? ns->cluster_size : 0]; |
812 | |
813 | // ... but sometimes a namespace is different. |
814 | if (ns_not_equal_global) { |
815 | ns_node_seq = stack_node_seq; |
816 | ns_sl_ix = stack_sl_ix; |
817 | |
818 | fill_namespace_rows(full_node_seq, full_sl_ix, ns_node_seq, |
819 | ns_sl_ix, ns, translation); |
820 | |
821 | if (ns->active_size != ns->cluster_size) { |
822 | quiesce_adjust_row(ns_node_seq, ns_sl_ix, ns); |
823 | } |
824 | |
825 | if (ns->prefer_uniform_balance) { |
826 | uniform_adjust_row(ns_node_seq, ns->active_size, ns_sl_ix, |
827 | ns->replication_factor, claims, target_claims, |
828 | ns->rack_ids, n_racks); |
829 | } |
830 | else if (n_racks != 1) { |
831 | rack_aware_adjust_row(ns_node_seq, ns_sl_ix, |
832 | ns->replication_factor, ns->rack_ids, |
833 | ns->active_size, n_racks, 1); |
834 | } |
835 | } |
836 | |
837 | cf_mutex_lock(&p->lock); |
838 | |
839 | p->working_master = (cf_node)0; |
840 | |
841 | p->n_replicas = ns->replication_factor; |
842 | memcpy(p->replicas, ns_node_seq, p->n_replicas * sizeof(cf_node)); |
843 | |
844 | p->n_dupl = 0; |
845 | |
846 | p->pending_emigrations = 0; |
847 | p->pending_lead_emigrations = 0; |
848 | p->pending_immigrations = 0; |
849 | |
850 | memset(p->immigrators, 0, ns->replication_factor * sizeof(bool)); |
851 | |
852 | p->n_witnesses = 0; |
853 | |
854 | uint32_t self_n = find_self(ns_node_seq, ns); |
855 | |
856 | as_partition_version final_version = { |
857 | .ckey = as_exchange_cluster_key(), |
858 | .master = self_n == 0 ? 1 : 0 |
859 | }; |
860 | |
861 | p->final_version = final_version; |
862 | |
863 | int working_master_n = find_working_master_ap(p, ns_sl_ix, ns); |
864 | |
865 | uint32_t n_dupl = 0; |
866 | cf_node dupls[ns->cluster_size]; |
867 | |
868 | as_partition_version orig_version = p->version; |
869 | |
870 | // TEMPORARY debugging. |
871 | uint32_t debug_n_immigrators = 0; |
872 | |
873 | if (working_master_n == -1) { |
874 | // No existing versions - assign fresh version to replicas. |
875 | working_master_n = 0; |
876 | |
877 | if (self_n < p->n_replicas) { |
878 | p->version = p->final_version; |
879 | } |
880 | |
881 | ns_fresh_partitions++; |
882 | } |
883 | else { |
884 | n_dupl = find_duplicates_ap(p, ns_node_seq, ns_sl_ix, ns, |
885 | (uint32_t)working_master_n, dupls); |
886 | |
887 | uint32_t n_immigrators = fill_immigrators(p, ns_sl_ix, ns, |
888 | (uint32_t)working_master_n, n_dupl); |
889 | |
890 | // TEMPORARY debugging. |
891 | debug_n_immigrators = n_immigrators; |
892 | |
893 | if (n_immigrators != 0) { |
894 | // Migrations required - advance versions for next |
895 | // rebalance, queue migrations for this rebalance. |
896 | |
897 | advance_version_ap(p, ns_sl_ix, ns, self_n, |
898 | (uint32_t)working_master_n, n_dupl, dupls); |
899 | |
900 | uint32_t lead_flags[ns->replication_factor]; |
901 | |
902 | emig_lead_flags_ap(p, ns_sl_ix, ns, lead_flags); |
903 | |
904 | queue_namespace_migrations(p, ns, self_n, |
905 | ns_node_seq[working_master_n], n_dupl, dupls, |
906 | lead_flags, mq); |
907 | |
908 | if (self_n == 0) { |
909 | fill_witnesses(p, ns_node_seq, ns_sl_ix, ns); |
910 | ns_pending_signals += p->n_witnesses; |
911 | } |
912 | } |
913 | else if (self_n < p->n_replicas) { |
914 | // No migrations required - refresh replicas' versions (only |
915 | // truly necessary if replication factor decreased). |
916 | p->version = p->final_version; |
917 | } |
918 | else { |
919 | // No migrations required - drop superfluous non-replica |
920 | // partitions immediately. |
921 | if (! drop_superfluous_version(p, ns)) { |
922 | // Quiesced nodes become subset of final version. |
923 | adjust_superfluous_version(p, ns); |
924 | } |
925 | } |
926 | } |
927 | |
928 | if (self_n == 0 || self_n == working_master_n) { |
929 | p->working_master = ns_node_seq[working_master_n]; |
930 | } |
931 | |
932 | handle_version_change(p, ns, &orig_version); |
933 | |
934 | ns_pending_emigrations += p->pending_emigrations; |
935 | ns_pending_lead_emigrations += p->pending_lead_emigrations; |
936 | ns_pending_immigrations += p->pending_immigrations; |
937 | |
938 | // TEMPORARY debugging. |
939 | if (pid < 20) { |
940 | cf_debug(AS_PARTITION, "ck%012lX %02u (%hu %hu) %s -> %s - self_n %u wm_n %d repls %u dupls %u immigrators %u" , |
941 | as_exchange_cluster_key(), pid, p->pending_emigrations, |
942 | p->pending_immigrations, |
943 | VERSION_AS_STRING(&orig_version), |
944 | VERSION_AS_STRING(&p->version), self_n, |
945 | working_master_n, p->n_replicas, n_dupl, |
946 | debug_n_immigrators); |
947 | } |
948 | |
949 | client_replica_maps_update(ns, pid); |
950 | } |
951 | |
952 | // Flush partition metadata for this group of partitions ... |
953 | as_storage_flush_pmeta(ns, start_pid, PIDS_PER_GROUP); |
954 | |
955 | // ... and unlock the group. |
956 | for (uint32_t pid = start_pid; pid < end_pid; pid++) { |
957 | as_partition* p = &ns->partitions[pid]; |
958 | |
959 | cf_mutex_unlock(&p->lock); |
960 | } |
961 | } |
962 | |
963 | cf_info(AS_PARTITION, "{%s} rebalanced: expected-migrations (%u,%u,%u) fresh-partitions %u" , |
964 | ns->name, ns_pending_emigrations, ns_pending_immigrations, |
965 | ns_pending_signals, ns_fresh_partitions); |
966 | |
967 | ns->n_unavailable_partitions = 0; |
968 | |
969 | ns->migrate_tx_partitions_initial = ns_pending_emigrations; |
970 | ns->migrate_tx_partitions_remaining = ns_pending_emigrations; |
971 | ns->migrate_tx_partitions_lead_remaining = ns_pending_lead_emigrations; |
972 | |
973 | ns->migrate_rx_partitions_initial = ns_pending_immigrations; |
974 | ns->migrate_rx_partitions_remaining = ns_pending_immigrations; |
975 | |
976 | ns->migrate_signals_remaining = ns_pending_signals; |
977 | } |
978 | |
979 | void |
980 | apply_single_replica_limit_ap(as_namespace* ns) |
981 | { |
982 | // Replication factor can't be bigger than observed cluster. |
983 | uint32_t repl_factor = ns->cluster_size < ns->cfg_replication_factor ? |
984 | ns->cluster_size : ns->cfg_replication_factor; |
985 | |
986 | // Reduce the replication factor to 1 if the cluster size is less than or |
987 | // equal to the specified limit. |
988 | ns->replication_factor = |
989 | ns->cluster_size <= g_config.paxos_single_replica_limit ? |
990 | 1 : repl_factor; |
991 | |
992 | cf_info(AS_PARTITION, "{%s} replication factor is %u" , ns->name, |
993 | ns->replication_factor); |
994 | } |
995 | |
996 | void |
997 | fill_translation(int translation[], const as_namespace* ns) |
998 | { |
999 | int ns_n = 0; |
1000 | |
1001 | for (uint32_t full_n = 0; full_n < g_cluster_size; full_n++) { |
1002 | translation[full_n] = ns_n < ns->cluster_size && |
1003 | g_succession[full_n] == ns->succession[ns_n] ? ns_n++ : -1; |
1004 | } |
1005 | } |
1006 | |
1007 | void |
1008 | fill_namespace_rows(const cf_node* full_node_seq, const sl_ix_t* full_sl_ix, |
1009 | cf_node* ns_node_seq, sl_ix_t* ns_sl_ix, const as_namespace* ns, |
1010 | const int translation[]) |
1011 | { |
1012 | if (ns->cluster_size == g_cluster_size) { |
1013 | // Rack-aware but namespace is on all nodes - just copy. Rack-aware will |
1014 | // rearrange the copies - we can't rearrange the global originals. |
1015 | memcpy(ns_node_seq, full_node_seq, g_cluster_size * sizeof(cf_node)); |
1016 | memcpy(ns_sl_ix, full_sl_ix, g_cluster_size * sizeof(sl_ix_t)); |
1017 | |
1018 | return; |
1019 | } |
1020 | |
1021 | // Fill namespace sequences from global table rows using translation array. |
1022 | uint32_t n = 0; |
1023 | |
1024 | for (uint32_t full_n = 0; full_n < g_cluster_size; full_n++) { |
1025 | int ns_n = translation[full_sl_ix[full_n]]; |
1026 | |
1027 | if (ns_n != -1) { |
1028 | ns_node_seq[n] = ns->succession[ns_n]; |
1029 | ns_sl_ix[n] = (sl_ix_t)ns_n; |
1030 | n++; |
1031 | } |
1032 | } |
1033 | } |
1034 | |
1035 | uint32_t |
1036 | find_self(const cf_node* ns_node_seq, const as_namespace* ns) |
1037 | { |
1038 | int n = index_of_node(ns_node_seq, ns->cluster_size, g_config.self_node); |
1039 | |
1040 | cf_assert(n != -1, AS_PARTITION, "{%s} self node not in succession list" , |
1041 | ns->name); |
1042 | |
1043 | return (uint32_t)n; |
1044 | } |
1045 | |
1046 | // Preference: Vm > V > Ve > Vs > Vse > absent. |
1047 | int |
1048 | find_working_master_ap(const as_partition* p, const sl_ix_t* ns_sl_ix, |
1049 | const as_namespace* ns) |
1050 | { |
1051 | int best_n = -1; |
1052 | int best_score = -1; |
1053 | |
1054 | for (int n = 0; n < (int)ns->cluster_size; n++) { |
1055 | const as_partition_version* version = INPUT_VERSION(n); |
1056 | |
1057 | // Skip versions with no data. |
1058 | if (! as_partition_version_has_data(version)) { |
1059 | continue; |
1060 | } |
1061 | |
1062 | // If previous working master exists, use it. (There can be more than |
1063 | // one after split brains. Also, the flag is only to prevent superfluous |
1064 | // master swaps on rebalance when rack-aware.) |
1065 | if (version->master == 1) { |
1066 | return shift_working_master(p, ns_sl_ix, ns, n, version); |
1067 | } |
1068 | // else - keep going but remember the best so far. |
1069 | |
1070 | int score; |
1071 | |
1072 | if (as_exchange_min_compatibility_id() >= 4) { |
1073 | // V = 3 > Vs = 2 > Ve > 1 > Vse = 0. |
1074 | score = 3 - ((version->subset == 1 ? 1 : 0) + |
1075 | (version->evade == 1 ? 2 : 0)); |
1076 | } |
1077 | else { |
1078 | // V = 3 > Ve = 2 > Vs = 1 > Vse = 0. |
1079 | score = (version->evade == 1 ? 0 : 1) + |
1080 | (version->subset == 1 ? 0 : 2); |
1081 | } |
1082 | |
1083 | if (score > best_score) { |
1084 | best_score = score; |
1085 | best_n = n; |
1086 | } |
1087 | } |
1088 | |
1089 | return best_n; |
1090 | } |
1091 | |
1092 | int |
1093 | shift_working_master(const as_partition* p, const sl_ix_t* ns_sl_ix, |
1094 | const as_namespace* ns, int working_master_n, |
1095 | const as_partition_version* working_master_version) |
1096 | { |
1097 | if (working_master_n == 0 || working_master_version->subset == 1) { |
1098 | return working_master_n; // can only shift full masters |
1099 | } |
1100 | |
1101 | for (int n = 0; n < working_master_n; n++) { |
1102 | const as_partition_version* version = INPUT_VERSION(n); |
1103 | |
1104 | if (is_same_as_full_master(working_master_version, version)) { |
1105 | return n; // master flag will get shifted later |
1106 | } |
1107 | } |
1108 | |
1109 | return working_master_n; |
1110 | } |
1111 | |
1112 | uint32_t |
1113 | find_duplicates_ap(const as_partition* p, const cf_node* ns_node_seq, |
1114 | const sl_ix_t* ns_sl_ix, const as_namespace* ns, |
1115 | uint32_t working_master_n, cf_node dupls[]) |
1116 | { |
1117 | uint32_t n_dupl = 0; |
1118 | as_partition_version parent_dupl_versions[ns->cluster_size]; |
1119 | |
1120 | memset(parent_dupl_versions, 0, sizeof(parent_dupl_versions)); |
1121 | |
1122 | for (uint32_t n = 0; n < ns->cluster_size; n++) { |
1123 | const as_partition_version* version = INPUT_VERSION(n); |
1124 | |
1125 | // Skip versions without data, and postpone subsets to next pass. |
1126 | if (! as_partition_version_has_data(version) || version->subset == 1) { |
1127 | continue; |
1128 | } |
1129 | |
1130 | // Every unique version is a duplicate. |
1131 | if (version->family == VERSION_FAMILY_UNIQUE) { |
1132 | dupls[n_dupl++] = ns_node_seq[n]; |
1133 | continue; |
1134 | } |
1135 | |
1136 | // Add parent versions as duplicates, unless they are already in. |
1137 | |
1138 | uint32_t d; |
1139 | |
1140 | for (d = 0; d < n_dupl; d++) { |
1141 | if (is_family_same(&parent_dupl_versions[d], version)) { |
1142 | break; |
1143 | } |
1144 | } |
1145 | |
1146 | if (d == n_dupl) { |
1147 | // Not in dupls. |
1148 | parent_dupl_versions[n_dupl] = *version; |
1149 | dupls[n_dupl++] = ns_node_seq[n]; |
1150 | } |
1151 | } |
1152 | |
1153 | // Second pass to deal with subsets. |
1154 | for (uint32_t n = 0; n < ns->cluster_size; n++) { |
1155 | const as_partition_version* version = INPUT_VERSION(n); |
1156 | |
1157 | if (version->subset == 0) { |
1158 | continue; |
1159 | } |
1160 | |
1161 | uint32_t d; |
1162 | |
1163 | for (d = 0; d < n_dupl; d++) { |
1164 | if (is_family_same(&parent_dupl_versions[d], version)) { |
1165 | break; |
1166 | } |
1167 | } |
1168 | |
1169 | if (d == n_dupl) { |
1170 | // Not in dupls. |
1171 | // Leave 0 in parent_dupl_versions array. |
1172 | dupls[n_dupl++] = ns_node_seq[n]; |
1173 | } |
1174 | } |
1175 | |
1176 | // Remove working master from 'variants' to leave duplicates. |
1177 | return remove_node(dupls, n_dupl, ns_node_seq[working_master_n]); |
1178 | } |
1179 | |
1180 | uint32_t |
1181 | fill_immigrators(as_partition* p, const sl_ix_t* ns_sl_ix, as_namespace* ns, |
1182 | uint32_t working_master_n, uint32_t n_dupl) |
1183 | { |
1184 | uint32_t n_immigrators = 0; |
1185 | |
1186 | for (uint32_t repl_ix = 0; repl_ix < p->n_replicas; repl_ix++) { |
1187 | const as_partition_version* version = INPUT_VERSION(repl_ix); |
1188 | |
1189 | if (n_dupl != 0 || (repl_ix != working_master_n && |
1190 | (! as_partition_version_has_data(version) || |
1191 | version->subset == 1))) { |
1192 | p->immigrators[repl_ix] = true; |
1193 | n_immigrators++; |
1194 | } |
1195 | } |
1196 | |
1197 | return n_immigrators; |
1198 | } |
1199 | |
1200 | void |
1201 | advance_version_ap(as_partition* p, const sl_ix_t* ns_sl_ix, as_namespace* ns, |
1202 | uint32_t self_n, uint32_t working_master_n, uint32_t n_dupl, |
1203 | const cf_node dupls[]) |
1204 | { |
1205 | // Advance working master. |
1206 | if (self_n == working_master_n) { |
1207 | p->version.ckey = p->final_version.ckey; |
1208 | p->version.family = (self_n == 0 || n_dupl == 0) ? 0 : 1; |
1209 | p->version.master = 1; |
1210 | p->version.subset = 0; |
1211 | p->version.evade = 0; |
1212 | |
1213 | return; |
1214 | } |
1215 | |
1216 | p->version.master = 0; |
1217 | |
1218 | bool self_is_versionless = ! as_partition_version_has_data(&p->version); |
1219 | |
1220 | // Advance eventual master. |
1221 | if (self_n == 0) { |
1222 | p->version.ckey = p->final_version.ckey; |
1223 | p->version.family = 0; |
1224 | p->version.subset = n_dupl == 0 ? 1 : 0; |
1225 | |
1226 | if (self_is_versionless) { |
1227 | p->version.evade = 1; |
1228 | } |
1229 | // else - don't change evade flag. |
1230 | |
1231 | return; |
1232 | } |
1233 | |
1234 | // Advance version-less proles and non-replicas (common case). |
1235 | if (self_is_versionless) { |
1236 | if (self_n < p->n_replicas) { |
1237 | p->version.ckey = p->final_version.ckey; |
1238 | p->version.family = 0; |
1239 | p->version.subset = 1; |
1240 | p->version.evade = 1; |
1241 | } |
1242 | // else - non-replicas remain version-less. |
1243 | |
1244 | return; |
1245 | } |
1246 | |
1247 | // Fill family versions. |
1248 | |
1249 | uint32_t max_n_families = p->n_replicas + 1; |
1250 | |
1251 | if (max_n_families > AS_PARTITION_N_FAMILIES) { |
1252 | max_n_families = AS_PARTITION_N_FAMILIES; |
1253 | } |
1254 | |
1255 | as_partition_version family_versions[max_n_families]; |
1256 | uint32_t n_families = fill_family_versions(p, ns_sl_ix, ns, |
1257 | working_master_n, n_dupl, dupls, family_versions); |
1258 | |
1259 | uint32_t family = find_family(&p->version, n_families, family_versions); |
1260 | |
1261 | // Advance non-masters with prior versions ... |
1262 | |
1263 | // ... proles ... |
1264 | if (self_n < p->n_replicas) { |
1265 | p->version.ckey = p->final_version.ckey; |
1266 | p->version.family = family; |
1267 | |
1268 | if (n_dupl != 0 && p->version.family == 0) { |
1269 | p->version.subset = 1; |
1270 | } |
1271 | // else - don't change either subset or evade flag. |
1272 | |
1273 | return; |
1274 | } |
1275 | |
1276 | // ... or non-replicas. |
1277 | if (family != VERSION_FAMILY_UNIQUE && |
1278 | family_versions[family].subset == 0) { |
1279 | p->version.ckey = p->final_version.ckey; |
1280 | p->version.family = family; |
1281 | p->version.subset = 1; |
1282 | } |
1283 | // else - leave version as-is. |
1284 | } |
1285 | |
1286 | uint32_t |
1287 | fill_family_versions(const as_partition* p, const sl_ix_t* ns_sl_ix, |
1288 | const as_namespace* ns, uint32_t working_master_n, uint32_t n_dupl, |
1289 | const cf_node dupls[], as_partition_version family_versions[]) |
1290 | { |
1291 | uint32_t n_families = 1; |
1292 | const as_partition_version* final_master_version = INPUT_VERSION(0); |
1293 | |
1294 | family_versions[0] = *final_master_version; |
1295 | |
1296 | if (working_master_n != 0) { |
1297 | const as_partition_version* working_master_version = |
1298 | INPUT_VERSION(working_master_n); |
1299 | |
1300 | if (n_dupl == 0) { |
1301 | family_versions[0] = *working_master_version; |
1302 | } |
1303 | else { |
1304 | family_versions[0] = p->final_version; // not matchable |
1305 | family_versions[1] = *working_master_version; |
1306 | n_families = 2; |
1307 | } |
1308 | } |
1309 | |
1310 | for (uint32_t repl_ix = 1; |
1311 | repl_ix < p->n_replicas && n_families < AS_PARTITION_N_FAMILIES; |
1312 | repl_ix++) { |
1313 | if (repl_ix == working_master_n) { |
1314 | continue; |
1315 | } |
1316 | |
1317 | const as_partition_version* version = INPUT_VERSION(repl_ix); |
1318 | |
1319 | if (contains_node(dupls, n_dupl, p->replicas[repl_ix])) { |
1320 | family_versions[n_families++] = *version; |
1321 | } |
1322 | else if (version->subset == 1 && |
1323 | ! has_replica_parent(p, ns_sl_ix, ns, version, repl_ix)) { |
1324 | family_versions[n_families++] = *version; |
1325 | } |
1326 | } |
1327 | |
1328 | return n_families; |
1329 | } |
1330 | |
1331 | bool |
1332 | has_replica_parent(const as_partition* p, const sl_ix_t* ns_sl_ix, |
1333 | const as_namespace* ns, const as_partition_version* subset_version, |
1334 | uint32_t subset_n) |
1335 | { |
1336 | for (uint32_t repl_ix = 1; repl_ix < p->n_replicas; repl_ix++) { |
1337 | if (repl_ix == subset_n) { |
1338 | continue; |
1339 | } |
1340 | |
1341 | const as_partition_version* version = INPUT_VERSION(repl_ix); |
1342 | |
1343 | if (version->subset == 0 && is_family_same(version, subset_version)) { |
1344 | return true; |
1345 | } |
1346 | } |
1347 | |
1348 | return false; |
1349 | } |
1350 | |
1351 | uint32_t |
1352 | find_family(const as_partition_version* self_version, uint32_t n_families, |
1353 | const as_partition_version family_versions[]) |
1354 | { |
1355 | for (uint32_t n = 0; n < n_families; n++) { |
1356 | if (is_family_same(self_version, &family_versions[n])) { |
1357 | return n; |
1358 | } |
1359 | } |
1360 | |
1361 | return VERSION_FAMILY_UNIQUE; |
1362 | } |
1363 | |
1364 | void |
1365 | queue_namespace_migrations(as_partition* p, as_namespace* ns, uint32_t self_n, |
1366 | cf_node working_master, uint32_t n_dupl, cf_node dupls[], |
1367 | const uint32_t lead_flags[], cf_queue* mq) |
1368 | { |
1369 | pb_task task; |
1370 | |
1371 | if (self_n == 0) { |
1372 | // <><><><><><> Final Master <><><><><><> |
1373 | |
1374 | if (g_config.self_node == working_master) { |
1375 | p->pending_immigrations = (uint16_t)n_dupl; |
1376 | } |
1377 | else { |
1378 | // Remove self from duplicates. |
1379 | n_dupl = remove_node(dupls, n_dupl, g_config.self_node); |
1380 | |
1381 | p->pending_immigrations = (uint16_t)n_dupl + 1; |
1382 | } |
1383 | |
1384 | if (n_dupl != 0) { |
1385 | p->n_dupl = n_dupl; |
1386 | memcpy(p->dupls, dupls, n_dupl * sizeof(cf_node)); |
1387 | } |
1388 | |
1389 | if (p->pending_immigrations != 0) { |
1390 | for (uint32_t repl_ix = 1; repl_ix < p->n_replicas; repl_ix++) { |
1391 | if (p->immigrators[repl_ix]) { |
1392 | p->pending_emigrations++; |
1393 | } |
1394 | } |
1395 | |
1396 | // Emigrate later, after all immigration is complete. |
1397 | return; |
1398 | } |
1399 | |
1400 | // Emigrate now, no immigrations to wait for. |
1401 | for (uint32_t repl_ix = 1; repl_ix < p->n_replicas; repl_ix++) { |
1402 | if (p->immigrators[repl_ix]) { |
1403 | p->pending_emigrations++; |
1404 | |
1405 | if (lead_flags[repl_ix] != TX_FLAGS_NONE) { |
1406 | p->pending_lead_emigrations++; |
1407 | } |
1408 | |
1409 | pb_task_init(&task, p->replicas[repl_ix], ns, p->id, |
1410 | as_exchange_cluster_key(), PB_TASK_EMIG_TRANSFER, |
1411 | lead_flags[repl_ix]); |
1412 | cf_queue_push(mq, &task); |
1413 | } |
1414 | } |
1415 | |
1416 | return; |
1417 | } |
1418 | // else - <><><><><><> Not Final Master <><><><><><> |
1419 | |
1420 | if (g_config.self_node == working_master) { |
1421 | if (n_dupl != 0) { |
1422 | p->n_dupl = n_dupl; |
1423 | memcpy(p->dupls, dupls, n_dupl * sizeof(cf_node)); |
1424 | } |
1425 | |
1426 | p->pending_emigrations = 1; |
1427 | |
1428 | if (lead_flags[0] != TX_FLAGS_NONE) { |
1429 | p->pending_lead_emigrations = 1; |
1430 | } |
1431 | |
1432 | pb_task_init(&task, p->replicas[0], ns, p->id, |
1433 | as_exchange_cluster_key(), PB_TASK_EMIG_TRANSFER, |
1434 | TX_FLAGS_ACTING_MASTER | lead_flags[0]); |
1435 | cf_queue_push(mq, &task); |
1436 | } |
1437 | else if (contains_self(dupls, n_dupl)) { |
1438 | p->pending_emigrations = 1; |
1439 | |
1440 | if (lead_flags[0] != TX_FLAGS_NONE) { |
1441 | p->pending_lead_emigrations = 1; |
1442 | } |
1443 | |
1444 | pb_task_init(&task, p->replicas[0], ns, p->id, |
1445 | as_exchange_cluster_key(), PB_TASK_EMIG_TRANSFER, |
1446 | lead_flags[0]); |
1447 | cf_queue_push(mq, &task); |
1448 | } |
1449 | |
1450 | if (self_n < p->n_replicas && p->immigrators[self_n]) { |
1451 | p->pending_immigrations = 1; |
1452 | } |
1453 | } |
1454 | |
1455 | void |
1456 | fill_witnesses(as_partition* p, const cf_node* ns_node_seq, |
1457 | const sl_ix_t* ns_sl_ix, as_namespace* ns) |
1458 | { |
1459 | for (uint32_t n = 1; n < ns->cluster_size; n++) { |
1460 | const as_partition_version* version = INPUT_VERSION(n); |
1461 | |
1462 | // Note - 0e/0r versions (CP) are witnesses. |
1463 | if (n < p->n_replicas || ! as_partition_version_is_null(version)) { |
1464 | p->witnesses[p->n_witnesses++] = ns_node_seq[n]; |
1465 | } |
1466 | } |
1467 | } |
1468 | |
1469 | // If version changed, create/drop trees as appropriate, and cache for storage. |
1470 | void |
1471 | handle_version_change(as_partition* p, struct as_namespace_s* ns, |
1472 | as_partition_version* orig_version) |
1473 | { |
1474 | if (as_partition_version_same(&p->version, orig_version)) { |
1475 | return; |
1476 | } |
1477 | |
1478 | if (! as_partition_version_has_data(orig_version) && |
1479 | as_partition_version_has_data(&p->version)) { |
1480 | create_trees(p, ns); |
1481 | } |
1482 | |
1483 | if (as_partition_version_has_data(orig_version) && |
1484 | ! as_partition_version_has_data(&p->version)) { |
1485 | // FIXME - temporary paranoia. |
1486 | cf_assert(p->tree, AS_PARTITION, "unexpected - null tree" ); |
1487 | drop_trees(p); |
1488 | } |
1489 | |
1490 | as_storage_cache_pmeta(ns, p); |
1491 | } |
1492 | |
1493 | |
1494 | //========================================================== |
1495 | // Local helpers - migration-related as_partition methods. |
1496 | // |
1497 | |
1498 | // Sanity checks for immigrations commands. |
1499 | bool |
1500 | partition_immigration_is_valid(const as_partition* p, cf_node source_node, |
1501 | const as_namespace* ns, const char* tag) |
1502 | { |
1503 | char* failure_reason = NULL; |
1504 | |
1505 | if (p->pending_immigrations == 0) { |
1506 | failure_reason = "no immigrations expected" ; |
1507 | } |
1508 | else if (is_self_final_master(p)) { |
1509 | if (source_node != p->working_master && |
1510 | ! contains_node(p->dupls, p->n_dupl, source_node)) { |
1511 | failure_reason = "final master's source not acting master or duplicate" ; |
1512 | } |
1513 | } |
1514 | else if (source_node != p->replicas[0]) { |
1515 | failure_reason = "prole's source not final working master" ; |
1516 | } |
1517 | |
1518 | if (failure_reason) { |
1519 | cf_warning(AS_PARTITION, "{%s:%u} immigrate_%s - source %lx working-master %lx pending-immigrations %hu - %s" , |
1520 | ns->name, p->id, tag, source_node, p->working_master, |
1521 | p->pending_immigrations, failure_reason); |
1522 | |
1523 | return false; |
1524 | } |
1525 | |
1526 | return true; |
1527 | } |
1528 | |
1529 | void |
1530 | emigrate_done_advance_non_master_version_ap(as_namespace* ns, as_partition* p, |
1531 | uint32_t tx_flags) |
1532 | { |
1533 | if ((tx_flags & TX_FLAGS_ACTING_MASTER) != 0) { |
1534 | p->working_master = (cf_node)0; |
1535 | p->n_dupl = 0; |
1536 | p->version.master = 0; |
1537 | } |
1538 | |
1539 | p->version.ckey = p->final_version.ckey; |
1540 | p->version.family = 0; |
1541 | |
1542 | if (p->pending_immigrations != 0 || ! is_self_replica(p)) { |
1543 | p->version.subset = 1; |
1544 | } |
1545 | // else - must already be a parent. |
1546 | |
1547 | as_storage_save_pmeta(ns, p); |
1548 | } |
1549 | |
1550 | void |
1551 | immigrate_start_advance_non_master_version_ap(as_partition* p) |
1552 | { |
1553 | // Become subset of final version if not already such. |
1554 | if (! (p->version.ckey == p->final_version.ckey && |
1555 | p->version.family == 0 && p->version.subset == 1)) { |
1556 | p->version.ckey = p->final_version.ckey; |
1557 | p->version.family = 0; |
1558 | p->version.master = 0; // racing emigrate done if we were acting master |
1559 | p->version.subset = 1; |
1560 | // Leave evade flag as-is. |
1561 | } |
1562 | } |
1563 | |
1564 | void |
1565 | immigrate_done_advance_final_master_version_ap(as_namespace* ns, |
1566 | as_partition* p) |
1567 | { |
1568 | if (! as_partition_version_same(&p->version, &p->final_version)) { |
1569 | p->version = p->final_version; |
1570 | as_storage_save_pmeta(ns, p); |
1571 | } |
1572 | } |
1573 | |