| 1 | /* |
| 2 | * partition.c |
| 3 | * |
| 4 | * Copyright (C) 2008-2018 Aerospike, Inc. |
| 5 | * |
| 6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
| 7 | * license agreements. |
| 8 | * |
| 9 | * This program is free software: you can redistribute it and/or modify it under |
| 10 | * the terms of the GNU Affero General Public License as published by the Free |
| 11 | * Software Foundation, either version 3 of the License, or (at your option) any |
| 12 | * later version. |
| 13 | * |
| 14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
| 15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
| 17 | * details. |
| 18 | * |
| 19 | * You should have received a copy of the GNU Affero General Public License |
| 20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
| 21 | */ |
| 22 | |
| 23 | //========================================================== |
| 24 | // Includes. |
| 25 | // |
| 26 | |
| 27 | #include "fabric/partition.h" |
| 28 | |
| 29 | #include <stdbool.h> |
| 30 | #include <stddef.h> |
| 31 | #include <stdint.h> |
| 32 | #include <string.h> |
| 33 | |
| 34 | #include "citrusleaf/alloc.h" |
| 35 | #include "citrusleaf/cf_atomic.h" |
| 36 | #include "citrusleaf/cf_b64.h" |
| 37 | |
| 38 | #include "cf_mutex.h" |
| 39 | #include "fault.h" |
| 40 | #include "node.h" |
| 41 | |
| 42 | #include "base/cfg.h" |
| 43 | #include "base/datamodel.h" |
| 44 | #include "base/index.h" |
| 45 | #include "base/proto.h" |
| 46 | #include "base/transaction.h" |
| 47 | #include "fabric/partition_balance.h" |
| 48 | |
| 49 | |
| 50 | //========================================================== |
| 51 | // Forward declarations. |
| 52 | // |
| 53 | |
| 54 | cf_node find_best_node(const as_partition* p, bool is_read); |
| 55 | void accumulate_replica_stats(const as_partition* p, uint64_t* p_n_objects, uint64_t* p_n_tombstones); |
| 56 | void partition_reserve_lockfree(as_partition* p, as_namespace* ns, as_partition_reservation* rsv); |
| 57 | char partition_descriptor(const as_partition* p); |
| 58 | int partition_get_replica_self_lockfree(const as_namespace* ns, uint32_t pid); |
| 59 | bool should_working_master_own(const as_namespace* ns, uint32_t pid, uint32_t repl_ix); |
| 60 | |
| 61 | |
| 62 | //========================================================== |
| 63 | // Public API. |
| 64 | // |
| 65 | |
| 66 | void |
| 67 | as_partition_init(as_namespace* ns, uint32_t pid) |
| 68 | { |
| 69 | as_partition* p = &ns->partitions[pid]; |
| 70 | |
| 71 | // Note - as_partition has been zeroed since it's a member of as_namespace. |
| 72 | // Set non-zero members. |
| 73 | |
| 74 | cf_mutex_init(&p->lock); |
| 75 | |
| 76 | p->id = pid; |
| 77 | |
| 78 | if (ns->cold_start) { |
| 79 | return; // trees are created later when we know which ones we own |
| 80 | } |
| 81 | |
| 82 | p->tree = as_index_tree_resume(&ns->tree_shared, ns->xmem_trees, pid, |
| 83 | as_partition_tree_done, (void*)p); |
| 84 | } |
| 85 | |
| 86 | void |
| 87 | as_partition_shutdown(as_namespace* ns, uint32_t pid) |
| 88 | { |
| 89 | as_partition* p = &ns->partitions[pid]; |
| 90 | |
| 91 | while (true) { |
| 92 | cf_mutex_lock(&p->lock); |
| 93 | |
| 94 | as_index_tree* tree = p->tree; |
| 95 | as_index_tree_reserve(tree); |
| 96 | |
| 97 | cf_mutex_unlock(&p->lock); |
| 98 | |
| 99 | // Must come outside partition lock, since transactions may take |
| 100 | // partition lock under the record (sprig) lock. |
| 101 | as_index_tree_block(tree); |
| 102 | |
| 103 | // If lucky, this remains locked and we complete shutdown. |
| 104 | cf_mutex_lock(&p->lock); |
| 105 | |
| 106 | if (tree == p->tree) { |
| 107 | break; // lucky - same tree we blocked |
| 108 | } |
| 109 | |
| 110 | // Bad luck - blocked a tree that just got switched, block the new one. |
| 111 | cf_mutex_unlock(&p->lock); |
| 112 | } |
| 113 | } |
| 114 | |
| 115 | void |
| 116 | as_partition_freeze(as_partition* p) |
| 117 | { |
| 118 | p->working_master = (cf_node)0; |
| 119 | |
| 120 | p->n_nodes = 0; |
| 121 | p->n_replicas = 0; |
| 122 | p->n_dupl = 0; |
| 123 | |
| 124 | p->pending_emigrations = 0; |
| 125 | p->pending_lead_emigrations = 0; |
| 126 | p->pending_immigrations = 0; |
| 127 | |
| 128 | p->n_witnesses = 0; |
| 129 | } |
| 130 | |
| 131 | // Get a list of all nodes (excluding self) that are replicas for a specified |
| 132 | // partition: place the list in *nv and return the number of nodes found. |
| 133 | uint32_t |
| 134 | as_partition_get_other_replicas(as_partition* p, cf_node* nv) |
| 135 | { |
| 136 | uint32_t n_other_replicas = 0; |
| 137 | |
| 138 | cf_mutex_lock(&p->lock); |
| 139 | |
| 140 | for (uint32_t repl_ix = 0; repl_ix < p->n_replicas; repl_ix++) { |
| 141 | // Don't ever include yourself. |
| 142 | if (p->replicas[repl_ix] == g_config.self_node) { |
| 143 | continue; |
| 144 | } |
| 145 | |
| 146 | // Copy the node ID into the user-supplied vector. |
| 147 | nv[n_other_replicas++] = p->replicas[repl_ix]; |
| 148 | } |
| 149 | |
| 150 | cf_mutex_unlock(&p->lock); |
| 151 | |
| 152 | return n_other_replicas; |
| 153 | } |
| 154 | |
| 155 | cf_node |
| 156 | as_partition_writable_node(as_namespace* ns, uint32_t pid) |
| 157 | { |
| 158 | as_partition* p = &ns->partitions[pid]; |
| 159 | |
| 160 | cf_mutex_lock(&p->lock); |
| 161 | |
| 162 | if (p->n_replicas == 0) { |
| 163 | // This partition is unavailable. |
| 164 | cf_mutex_unlock(&p->lock); |
| 165 | return (cf_node)0; |
| 166 | } |
| 167 | |
| 168 | cf_node best_node = find_best_node(p, false); |
| 169 | |
| 170 | cf_mutex_unlock(&p->lock); |
| 171 | |
| 172 | return best_node; |
| 173 | } |
| 174 | |
| 175 | // If this node is an eventual master, return the acting master, else return 0. |
| 176 | cf_node |
| 177 | as_partition_proxyee_redirect(as_namespace* ns, uint32_t pid) |
| 178 | { |
| 179 | as_partition* p = &ns->partitions[pid]; |
| 180 | |
| 181 | cf_mutex_lock(&p->lock); |
| 182 | |
| 183 | cf_node node = (cf_node)0; |
| 184 | |
| 185 | if (g_config.self_node == p->replicas[0] && |
| 186 | g_config.self_node != p->working_master) { |
| 187 | node = p->working_master; |
| 188 | } |
| 189 | |
| 190 | cf_mutex_unlock(&p->lock); |
| 191 | |
| 192 | return node; |
| 193 | } |
| 194 | |
| 195 | void |
| 196 | as_partition_get_replicas_master_str(cf_dyn_buf* db) |
| 197 | { |
| 198 | size_t db_sz = db->used_sz; |
| 199 | |
| 200 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
| 201 | as_namespace* ns = g_config.namespaces[ns_ix]; |
| 202 | |
| 203 | cf_dyn_buf_append_string(db, ns->name); |
| 204 | cf_dyn_buf_append_char(db, ':'); |
| 205 | cf_dyn_buf_append_buf(db, (uint8_t*)ns->replica_maps[0].b64map, |
| 206 | sizeof(ns->replica_maps[0].b64map)); |
| 207 | cf_dyn_buf_append_char(db, ';'); |
| 208 | } |
| 209 | |
| 210 | if (db_sz != db->used_sz) { |
| 211 | cf_dyn_buf_chomp(db); |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | void |
| 216 | as_partition_get_replicas_all_str(cf_dyn_buf* db, bool include_regime) |
| 217 | { |
| 218 | size_t db_sz = db->used_sz; |
| 219 | |
| 220 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
| 221 | as_namespace* ns = g_config.namespaces[ns_ix]; |
| 222 | |
| 223 | cf_dyn_buf_append_string(db, ns->name); |
| 224 | cf_dyn_buf_append_char(db, ':'); |
| 225 | |
| 226 | if (include_regime) { |
| 227 | cf_dyn_buf_append_uint32(db, ns->rebalance_regime); |
| 228 | cf_dyn_buf_append_char(db, ','); |
| 229 | } |
| 230 | |
| 231 | uint32_t repl_factor = ns->replication_factor; |
| 232 | |
| 233 | // If we haven't rebalanced yet, report 1 column with no ownership. |
| 234 | if (repl_factor == 0) { |
| 235 | repl_factor = 1; |
| 236 | } |
| 237 | |
| 238 | cf_dyn_buf_append_uint32(db, repl_factor); |
| 239 | |
| 240 | for (uint32_t repl_ix = 0; repl_ix < repl_factor; repl_ix++) { |
| 241 | cf_dyn_buf_append_char(db, ','); |
| 242 | cf_dyn_buf_append_buf(db, |
| 243 | (uint8_t*)&ns->replica_maps[repl_ix].b64map, |
| 244 | sizeof(ns->replica_maps[repl_ix].b64map)); |
| 245 | } |
| 246 | |
| 247 | cf_dyn_buf_append_char(db, ';'); |
| 248 | } |
| 249 | |
| 250 | if (db_sz != db->used_sz) { |
| 251 | cf_dyn_buf_chomp(db); |
| 252 | } |
| 253 | } |
| 254 | |
| 255 | void |
| 256 | as_partition_get_replica_stats(as_namespace* ns, repl_stats* p_stats) |
| 257 | { |
| 258 | memset(p_stats, 0, sizeof(repl_stats)); |
| 259 | |
| 260 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
| 261 | as_partition* p = &ns->partitions[pid]; |
| 262 | |
| 263 | cf_mutex_lock(&p->lock); |
| 264 | |
| 265 | if (g_config.self_node == p->working_master) { |
| 266 | accumulate_replica_stats(p, |
| 267 | &p_stats->n_master_objects, |
| 268 | &p_stats->n_master_tombstones); |
| 269 | } |
| 270 | else if (find_self_in_replicas(p) >= 0) { // -1 if not |
| 271 | accumulate_replica_stats(p, |
| 272 | &p_stats->n_prole_objects, |
| 273 | &p_stats->n_prole_tombstones); |
| 274 | } |
| 275 | else { |
| 276 | accumulate_replica_stats(p, |
| 277 | &p_stats->n_non_replica_objects, |
| 278 | &p_stats->n_non_replica_tombstones); |
| 279 | } |
| 280 | |
| 281 | cf_mutex_unlock(&p->lock); |
| 282 | } |
| 283 | } |
| 284 | |
| 285 | // TODO - what if partition is unavailable? |
| 286 | void |
| 287 | as_partition_reserve(as_namespace* ns, uint32_t pid, |
| 288 | as_partition_reservation* rsv) |
| 289 | { |
| 290 | as_partition* p = &ns->partitions[pid]; |
| 291 | |
| 292 | cf_mutex_lock(&p->lock); |
| 293 | |
| 294 | partition_reserve_lockfree(p, ns, rsv); |
| 295 | |
| 296 | cf_mutex_unlock(&p->lock); |
| 297 | } |
| 298 | |
| 299 | int |
| 300 | as_partition_reserve_replica(as_namespace* ns, uint32_t pid, |
| 301 | as_partition_reservation* rsv) |
| 302 | { |
| 303 | as_partition* p = &ns->partitions[pid]; |
| 304 | |
| 305 | cf_mutex_lock(&p->lock); |
| 306 | |
| 307 | if (! is_self_replica(p)) { |
| 308 | cf_mutex_unlock(&p->lock); |
| 309 | return AS_ERR_CLUSTER_KEY_MISMATCH; |
| 310 | } |
| 311 | |
| 312 | partition_reserve_lockfree(p, ns, rsv); |
| 313 | |
| 314 | cf_mutex_unlock(&p->lock); |
| 315 | |
| 316 | return AS_OK; |
| 317 | } |
| 318 | |
| 319 | // Returns: |
| 320 | // 0 - reserved - node parameter returns self node |
| 321 | // -1 - not reserved - node parameter returns other "better" node |
| 322 | // -2 - not reserved - node parameter not filled - partition is unavailable |
| 323 | int |
| 324 | as_partition_reserve_write(as_namespace* ns, uint32_t pid, |
| 325 | as_partition_reservation* rsv, cf_node* node) |
| 326 | { |
| 327 | as_partition* p = &ns->partitions[pid]; |
| 328 | |
| 329 | cf_mutex_lock(&p->lock); |
| 330 | |
| 331 | // If this partition is frozen, return. |
| 332 | if (p->n_replicas == 0) { |
| 333 | if (node) { |
| 334 | *node = (cf_node)0; |
| 335 | } |
| 336 | |
| 337 | cf_mutex_unlock(&p->lock); |
| 338 | return -2; |
| 339 | } |
| 340 | |
| 341 | cf_node best_node = find_best_node(p, false); |
| 342 | |
| 343 | if (node) { |
| 344 | *node = best_node; |
| 345 | } |
| 346 | |
| 347 | // If this node is not the appropriate one, return. |
| 348 | if (best_node != g_config.self_node) { |
| 349 | cf_mutex_unlock(&p->lock); |
| 350 | return -1; |
| 351 | } |
| 352 | |
| 353 | partition_reserve_lockfree(p, ns, rsv); |
| 354 | |
| 355 | cf_mutex_unlock(&p->lock); |
| 356 | |
| 357 | return 0; |
| 358 | } |
| 359 | |
| 360 | // Returns: |
| 361 | // 0 - reserved - node parameter returns self node |
| 362 | // -1 - not reserved - node parameter returns other "better" node |
| 363 | // -2 - not reserved - node parameter not filled - partition is unavailable |
| 364 | int |
| 365 | as_partition_reserve_read_tr(as_namespace* ns, uint32_t pid, as_transaction* tr, |
| 366 | cf_node* node) |
| 367 | { |
| 368 | as_partition* p = &ns->partitions[pid]; |
| 369 | |
| 370 | cf_mutex_lock(&p->lock); |
| 371 | |
| 372 | // Handle unavailable partition. |
| 373 | if (p->n_replicas == 0) { |
| 374 | int result = partition_reserve_unavailable(ns, p, tr, node); |
| 375 | |
| 376 | if (result == 0) { |
| 377 | partition_reserve_lockfree(p, ns, &tr->rsv); |
| 378 | } |
| 379 | |
| 380 | cf_mutex_unlock(&p->lock); |
| 381 | return result; |
| 382 | } |
| 383 | |
| 384 | cf_node best_node = find_best_node(p, |
| 385 | ! partition_reserve_promote(ns, p, tr)); |
| 386 | |
| 387 | if (node) { |
| 388 | *node = best_node; |
| 389 | } |
| 390 | |
| 391 | // If this node is not the appropriate one, return. |
| 392 | if (best_node != g_config.self_node) { |
| 393 | cf_mutex_unlock(&p->lock); |
| 394 | return -1; |
| 395 | } |
| 396 | |
| 397 | partition_reserve_lockfree(p, ns, &tr->rsv); |
| 398 | |
| 399 | cf_mutex_unlock(&p->lock); |
| 400 | |
| 401 | return 0; |
| 402 | } |
| 403 | |
| 404 | // Reserves all query-able partitions. |
| 405 | // Returns the number of partitions reserved. |
| 406 | int |
| 407 | as_partition_prereserve_query(as_namespace* ns, bool can_partition_query[], |
| 408 | as_partition_reservation rsv[]) |
| 409 | { |
| 410 | int reserved = 0; |
| 411 | |
| 412 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
| 413 | if (as_partition_reserve_query(ns, pid, &rsv[pid])) { |
| 414 | can_partition_query[pid] = false; |
| 415 | } |
| 416 | else { |
| 417 | can_partition_query[pid] = true; |
| 418 | reserved++; |
| 419 | } |
| 420 | } |
| 421 | |
| 422 | return reserved; |
| 423 | } |
| 424 | |
| 425 | // Reserve a partition for query. |
| 426 | // Return value 0 means the reservation was taken, -1 means not. |
| 427 | int |
| 428 | as_partition_reserve_query(as_namespace* ns, uint32_t pid, |
| 429 | as_partition_reservation* rsv) |
| 430 | { |
| 431 | return as_partition_reserve_write(ns, pid, rsv, NULL); |
| 432 | } |
| 433 | |
| 434 | // Succeeds if we are full working master or prole. |
| 435 | int |
| 436 | as_partition_reserve_xdr_read(as_namespace* ns, uint32_t pid, |
| 437 | as_partition_reservation* rsv) |
| 438 | { |
| 439 | as_partition* p = &ns->partitions[pid]; |
| 440 | |
| 441 | cf_mutex_lock(&p->lock); |
| 442 | |
| 443 | int res = -1; |
| 444 | |
| 445 | if (p->pending_immigrations == 0 && |
| 446 | (g_config.self_node == p->working_master || |
| 447 | find_self_in_replicas(p) >= 0)) { |
| 448 | partition_reserve_lockfree(p, ns, rsv); |
| 449 | res = 0; |
| 450 | } |
| 451 | |
| 452 | cf_mutex_unlock(&p->lock); |
| 453 | |
| 454 | return res; |
| 455 | } |
| 456 | |
| 457 | void |
| 458 | as_partition_reservation_copy(as_partition_reservation* dst, |
| 459 | as_partition_reservation* src) |
| 460 | { |
| 461 | dst->ns = src->ns; |
| 462 | dst->p = src->p; |
| 463 | dst->tree = src->tree; |
| 464 | dst->regime = src->regime; |
| 465 | dst->n_dupl = src->n_dupl; |
| 466 | |
| 467 | if (dst->n_dupl != 0) { |
| 468 | memcpy(dst->dupl_nodes, src->dupl_nodes, sizeof(cf_node) * dst->n_dupl); |
| 469 | } |
| 470 | } |
| 471 | |
| 472 | void |
| 473 | as_partition_release(as_partition_reservation* rsv) |
| 474 | { |
| 475 | as_index_tree_release(rsv->tree); |
| 476 | } |
| 477 | |
| 478 | void |
| 479 | as_partition_advance_tree_id(as_partition* p, const char* ns_name) |
| 480 | { |
| 481 | uint32_t n_hanging; |
| 482 | |
| 483 | // Find first available tree-id past current one. Should be very next one. |
| 484 | for (n_hanging = 0; n_hanging < MAX_NUM_TREE_IDS; n_hanging++) { |
| 485 | p->tree_id = (p->tree_id + 1) & TREE_ID_MASK; |
| 486 | |
| 487 | uint64_t id_mask = 1UL << p->tree_id; |
| 488 | |
| 489 | if ((p->tree_ids_used & id_mask) == 0) { |
| 490 | // Claim tree-id. Claim is relinquished when tree is destroyed. |
| 491 | p->tree_ids_used |= id_mask; |
| 492 | break; |
| 493 | } |
| 494 | } |
| 495 | |
| 496 | // If no available tree-ids, just stop. Should never happen. |
| 497 | if (n_hanging == MAX_NUM_TREE_IDS) { |
| 498 | cf_crash(AS_PARTITION, "{%s} pid %u has %u dropped trees hanging" , |
| 499 | ns_name, p->id, n_hanging); |
| 500 | } |
| 501 | |
| 502 | // Too many hanging trees - ref-count leak? Offer chance to warm restart. |
| 503 | if (n_hanging > MAX_NUM_TREE_IDS / 2) { |
| 504 | cf_warning(AS_PARTITION, "{%s} pid %u has %u dropped trees hanging" , |
| 505 | ns_name, p->id, n_hanging); |
| 506 | } |
| 507 | } |
| 508 | |
| 509 | // Callback made when dropped as_index_tree is finally destroyed. |
| 510 | void |
| 511 | as_partition_tree_done(uint8_t id, void* udata) |
| 512 | { |
| 513 | as_partition* p = (as_partition*)udata; |
| 514 | |
| 515 | cf_mutex_lock(&p->lock); |
| 516 | |
| 517 | // Relinquish tree-id. |
| 518 | p->tree_ids_used &= ~(1UL << id); |
| 519 | |
| 520 | cf_mutex_unlock(&p->lock); |
| 521 | } |
| 522 | |
| 523 | void |
| 524 | as_partition_getinfo_str(cf_dyn_buf* db) |
| 525 | { |
| 526 | size_t db_sz = db->used_sz; |
| 527 | |
| 528 | cf_dyn_buf_append_string(db, "namespace:partition:state:n_replicas:replica:" |
| 529 | "n_dupl:working_master:emigrates:lead_emigrates:immigrates:records:" |
| 530 | "tombstones:regime:version:final_version;" ); |
| 531 | |
| 532 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { |
| 533 | as_namespace* ns = g_config.namespaces[ns_ix]; |
| 534 | |
| 535 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { |
| 536 | as_partition* p = &ns->partitions[pid]; |
| 537 | |
| 538 | cf_mutex_lock(&p->lock); |
| 539 | |
| 540 | cf_dyn_buf_append_string(db, ns->name); |
| 541 | cf_dyn_buf_append_char(db, ':'); |
| 542 | cf_dyn_buf_append_uint32(db, pid); |
| 543 | cf_dyn_buf_append_char(db, ':'); |
| 544 | cf_dyn_buf_append_char(db, partition_descriptor(p)); |
| 545 | cf_dyn_buf_append_char(db, ':'); |
| 546 | cf_dyn_buf_append_uint32(db, p->n_replicas); |
| 547 | cf_dyn_buf_append_char(db, ':'); |
| 548 | cf_dyn_buf_append_int(db, find_self_in_replicas(p)); |
| 549 | cf_dyn_buf_append_char(db, ':'); |
| 550 | cf_dyn_buf_append_uint32(db, p->n_dupl); |
| 551 | cf_dyn_buf_append_char(db, ':'); |
| 552 | cf_dyn_buf_append_uint64_x(db, p->working_master); |
| 553 | cf_dyn_buf_append_char(db, ':'); |
| 554 | cf_dyn_buf_append_uint32(db, p->pending_emigrations); |
| 555 | cf_dyn_buf_append_char(db, ':'); |
| 556 | cf_dyn_buf_append_uint32(db, p->pending_lead_emigrations); |
| 557 | cf_dyn_buf_append_char(db, ':'); |
| 558 | cf_dyn_buf_append_uint32(db, p->pending_immigrations); |
| 559 | cf_dyn_buf_append_char(db, ':'); |
| 560 | cf_dyn_buf_append_uint32(db, as_index_tree_size(p->tree)); |
| 561 | cf_dyn_buf_append_char(db, ':'); |
| 562 | cf_dyn_buf_append_uint64(db, p->n_tombstones); |
| 563 | cf_dyn_buf_append_char(db, ':'); |
| 564 | cf_dyn_buf_append_uint32(db, p->regime); |
| 565 | cf_dyn_buf_append_char(db, ':'); |
| 566 | cf_dyn_buf_append_string(db, VERSION_AS_STRING(&p->version)); |
| 567 | cf_dyn_buf_append_char(db, ':'); |
| 568 | cf_dyn_buf_append_string(db, VERSION_AS_STRING(&p->final_version)); |
| 569 | |
| 570 | cf_dyn_buf_append_char(db, ';'); |
| 571 | |
| 572 | cf_mutex_unlock(&p->lock); |
| 573 | } |
| 574 | } |
| 575 | |
| 576 | if (db_sz != db->used_sz) { |
| 577 | cf_dyn_buf_chomp(db); // take back the final ';' |
| 578 | } |
| 579 | } |
| 580 | |
| 581 | |
| 582 | //========================================================== |
| 583 | // Public API - client view replica maps. |
| 584 | // |
| 585 | |
| 586 | void |
| 587 | client_replica_maps_create(as_namespace* ns) |
| 588 | { |
| 589 | uint32_t size = sizeof(client_replica_map) * ns->cfg_replication_factor; |
| 590 | |
| 591 | ns->replica_maps = cf_malloc(size); |
| 592 | memset(ns->replica_maps, 0, size); |
| 593 | |
| 594 | for (uint32_t repl_ix = 0; repl_ix < ns->cfg_replication_factor; |
| 595 | repl_ix++) { |
| 596 | client_replica_map* repl_map = &ns->replica_maps[repl_ix]; |
| 597 | |
| 598 | cf_mutex_init(&repl_map->write_lock); |
| 599 | |
| 600 | cf_b64_encode((uint8_t*)repl_map->bitmap, |
| 601 | (uint32_t)sizeof(repl_map->bitmap), (char*)repl_map->b64map); |
| 602 | } |
| 603 | } |
| 604 | |
| 605 | void |
| 606 | client_replica_maps_clear(as_namespace* ns) |
| 607 | { |
| 608 | memset(ns->replica_maps, 0, |
| 609 | sizeof(client_replica_map) * ns->cfg_replication_factor); |
| 610 | |
| 611 | for (uint32_t repl_ix = 0; repl_ix < ns->cfg_replication_factor; |
| 612 | repl_ix++) { |
| 613 | client_replica_map* repl_map = &ns->replica_maps[repl_ix]; |
| 614 | |
| 615 | cf_b64_encode((uint8_t*)repl_map->bitmap, |
| 616 | (uint32_t)sizeof(repl_map->bitmap), (char*)repl_map->b64map); |
| 617 | } |
| 618 | } |
| 619 | |
| 620 | bool |
| 621 | client_replica_maps_update(as_namespace* ns, uint32_t pid) |
| 622 | { |
| 623 | uint32_t byte_i = pid >> 3; |
| 624 | uint32_t byte_chunk = (byte_i / 3); |
| 625 | uint32_t chunk_bitmap_offset = byte_chunk * 3; |
| 626 | uint32_t chunk_b64map_offset = byte_chunk << 2; |
| 627 | |
| 628 | uint32_t bytes_from_end = CLIENT_BITMAP_BYTES - chunk_bitmap_offset; |
| 629 | uint32_t input_size = bytes_from_end > 3 ? 3 : bytes_from_end; |
| 630 | |
| 631 | int replica = partition_get_replica_self_lockfree(ns, pid); // -1 if not |
| 632 | uint8_t set_mask = 0x80 >> (pid & 0x7); |
| 633 | bool changed = false; |
| 634 | |
| 635 | for (int repl_ix = 0; repl_ix < (int)ns->cfg_replication_factor; |
| 636 | repl_ix++) { |
| 637 | client_replica_map* repl_map = &ns->replica_maps[repl_ix]; |
| 638 | |
| 639 | volatile uint8_t* mbyte = repl_map->bitmap + byte_i; |
| 640 | |
| 641 | bool owned = replica == repl_ix || |
| 642 | // Working master also owns all immigrating prole columns, and |
| 643 | // if it's an acting master in a prole column, that column (e.g. |
| 644 | // full, and migrating to newly added master). |
| 645 | (replica == 0 && // am working master |
| 646 | should_working_master_own(ns, pid, repl_ix)); |
| 647 | |
| 648 | bool is_set = (*mbyte & set_mask) != 0; |
| 649 | bool needs_update = (owned && ! is_set) || (! owned && is_set); |
| 650 | |
| 651 | if (! needs_update) { |
| 652 | continue; |
| 653 | } |
| 654 | |
| 655 | volatile uint8_t* bitmap_chunk = repl_map->bitmap + chunk_bitmap_offset; |
| 656 | volatile char* b64map_chunk = repl_map->b64map + chunk_b64map_offset; |
| 657 | |
| 658 | cf_mutex_lock(&repl_map->write_lock); |
| 659 | |
| 660 | *mbyte ^= set_mask; |
| 661 | cf_b64_encode((uint8_t*)bitmap_chunk, input_size, (char*)b64map_chunk); |
| 662 | |
| 663 | cf_mutex_unlock(&repl_map->write_lock); |
| 664 | |
| 665 | changed = true; |
| 666 | } |
| 667 | |
| 668 | return changed; |
| 669 | } |
| 670 | |
| 671 | bool |
| 672 | client_replica_maps_is_partition_queryable(const as_namespace* ns, uint32_t pid) |
| 673 | { |
| 674 | uint32_t byte_i = pid >> 3; |
| 675 | |
| 676 | const client_replica_map* repl_map = ns->replica_maps; |
| 677 | const volatile uint8_t* mbyte = repl_map->bitmap + byte_i; |
| 678 | |
| 679 | uint8_t set_mask = 0x80 >> (pid & 0x7); |
| 680 | |
| 681 | return (*mbyte & set_mask) != 0; |
| 682 | } |
| 683 | |
| 684 | |
| 685 | //========================================================== |
| 686 | // Local helpers. |
| 687 | // |
| 688 | |
| 689 | // Find best node to handle read/write. Called within partition lock. |
| 690 | cf_node |
| 691 | find_best_node(const as_partition* p, bool is_read) |
| 692 | { |
| 693 | // Working master (final or acting) returns self, eventual master returns |
| 694 | // acting master. Others don't have p->working_master set. |
| 695 | if (p->working_master != (cf_node)0) { |
| 696 | return p->working_master; |
| 697 | } |
| 698 | |
| 699 | if (is_read && p->pending_immigrations == 0 && |
| 700 | find_self_in_replicas(p) > 0) { |
| 701 | return g_config.self_node; // may read from prole that's got everything |
| 702 | } |
| 703 | |
| 704 | return p->replicas[0]; // final master as a last resort |
| 705 | } |
| 706 | |
| 707 | void |
| 708 | accumulate_replica_stats(const as_partition* p, uint64_t* p_n_objects, |
| 709 | uint64_t* p_n_tombstones) |
| 710 | { |
| 711 | int64_t n_tombstones = (int64_t)p->n_tombstones; |
| 712 | int64_t n_objects = (int64_t)as_index_tree_size(p->tree) - n_tombstones; |
| 713 | |
| 714 | *p_n_objects += n_objects > 0 ? (uint64_t)n_objects : 0; |
| 715 | *p_n_tombstones += (uint64_t)n_tombstones; |
| 716 | } |
| 717 | |
| 718 | void |
| 719 | partition_reserve_lockfree(as_partition* p, as_namespace* ns, |
| 720 | as_partition_reservation* rsv) |
| 721 | { |
| 722 | as_index_tree_reserve(p->tree); |
| 723 | |
| 724 | rsv->ns = ns; |
| 725 | rsv->p = p; |
| 726 | rsv->tree = p->tree; |
| 727 | rsv->regime = p->regime; |
| 728 | rsv->n_dupl = p->n_dupl; |
| 729 | |
| 730 | if (rsv->n_dupl != 0) { |
| 731 | memcpy(rsv->dupl_nodes, p->dupls, sizeof(cf_node) * rsv->n_dupl); |
| 732 | } |
| 733 | } |
| 734 | |
| 735 | char |
| 736 | partition_descriptor(const as_partition* p) |
| 737 | { |
| 738 | if (find_self_in_replicas(p) >= 0) { // -1 if not |
| 739 | return p->pending_immigrations == 0 ? 'S' : 'D'; |
| 740 | } |
| 741 | |
| 742 | if (as_partition_version_is_null(&p->version)) { |
| 743 | return 'A'; |
| 744 | } |
| 745 | |
| 746 | return as_partition_version_has_data(&p->version) ? 'Z' : 'X'; |
| 747 | } |
| 748 | |
| 749 | int |
| 750 | partition_get_replica_self_lockfree(const as_namespace* ns, uint32_t pid) |
| 751 | { |
| 752 | const as_partition* p = &ns->partitions[pid]; |
| 753 | |
| 754 | if (g_config.self_node == p->working_master) { |
| 755 | return 0; |
| 756 | } |
| 757 | |
| 758 | int self_n = find_self_in_replicas(p); // -1 if not |
| 759 | |
| 760 | if (self_n > 0 && p->pending_immigrations == 0 && |
| 761 | // Check self_n < n_repl only because n_repl could be out-of-sync |
| 762 | // with (less than) partition's replica list count. |
| 763 | self_n < (int)ns->replication_factor) { |
| 764 | return self_n; |
| 765 | } |
| 766 | |
| 767 | return -1; // not a replica |
| 768 | } |
| 769 | |
| 770 | bool |
| 771 | should_working_master_own(const as_namespace* ns, uint32_t pid, |
| 772 | uint32_t repl_ix) |
| 773 | { |
| 774 | const as_partition* p = &ns->partitions[pid]; |
| 775 | |
| 776 | return find_self_in_replicas(p) == (int)repl_ix || p->immigrators[repl_ix]; |
| 777 | } |
| 778 | |