| 1 | /* | 
|---|
| 2 | * partition.c | 
|---|
| 3 | * | 
|---|
| 4 | * Copyright (C) 2008-2018 Aerospike, Inc. | 
|---|
| 5 | * | 
|---|
| 6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor | 
|---|
| 7 | * license agreements. | 
|---|
| 8 | * | 
|---|
| 9 | * This program is free software: you can redistribute it and/or modify it under | 
|---|
| 10 | * the terms of the GNU Affero General Public License as published by the Free | 
|---|
| 11 | * Software Foundation, either version 3 of the License, or (at your option) any | 
|---|
| 12 | * later version. | 
|---|
| 13 | * | 
|---|
| 14 | * This program is distributed in the hope that it will be useful, but WITHOUT | 
|---|
| 15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 
|---|
| 16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more | 
|---|
| 17 | * details. | 
|---|
| 18 | * | 
|---|
| 19 | * You should have received a copy of the GNU Affero General Public License | 
|---|
| 20 | * along with this program.  If not, see http://www.gnu.org/licenses/ | 
|---|
| 21 | */ | 
|---|
| 22 |  | 
|---|
| 23 | //========================================================== | 
|---|
| 24 | // Includes. | 
|---|
| 25 | // | 
|---|
| 26 |  | 
|---|
| 27 | #include "fabric/partition.h" | 
|---|
| 28 |  | 
|---|
| 29 | #include <stdbool.h> | 
|---|
| 30 | #include <stddef.h> | 
|---|
| 31 | #include <stdint.h> | 
|---|
| 32 | #include <string.h> | 
|---|
| 33 |  | 
|---|
| 34 | #include "citrusleaf/alloc.h" | 
|---|
| 35 | #include "citrusleaf/cf_atomic.h" | 
|---|
| 36 | #include "citrusleaf/cf_b64.h" | 
|---|
| 37 |  | 
|---|
| 38 | #include "cf_mutex.h" | 
|---|
| 39 | #include "fault.h" | 
|---|
| 40 | #include "node.h" | 
|---|
| 41 |  | 
|---|
| 42 | #include "base/cfg.h" | 
|---|
| 43 | #include "base/datamodel.h" | 
|---|
| 44 | #include "base/index.h" | 
|---|
| 45 | #include "base/proto.h" | 
|---|
| 46 | #include "base/transaction.h" | 
|---|
| 47 | #include "fabric/partition_balance.h" | 
|---|
| 48 |  | 
|---|
| 49 |  | 
|---|
| 50 | //========================================================== | 
|---|
| 51 | // Forward declarations. | 
|---|
| 52 | // | 
|---|
| 53 |  | 
|---|
| 54 | cf_node find_best_node(const as_partition* p, bool is_read); | 
|---|
| 55 | void accumulate_replica_stats(const as_partition* p, uint64_t* p_n_objects, uint64_t* p_n_tombstones); | 
|---|
| 56 | void partition_reserve_lockfree(as_partition* p, as_namespace* ns, as_partition_reservation* rsv); | 
|---|
| 57 | char partition_descriptor(const as_partition* p); | 
|---|
| 58 | int partition_get_replica_self_lockfree(const as_namespace* ns, uint32_t pid); | 
|---|
| 59 | bool should_working_master_own(const as_namespace* ns, uint32_t pid, uint32_t repl_ix); | 
|---|
| 60 |  | 
|---|
| 61 |  | 
|---|
| 62 | //========================================================== | 
|---|
| 63 | // Public API. | 
|---|
| 64 | // | 
|---|
| 65 |  | 
|---|
| 66 | void | 
|---|
| 67 | as_partition_init(as_namespace* ns, uint32_t pid) | 
|---|
| 68 | { | 
|---|
| 69 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 70 |  | 
|---|
| 71 | // Note - as_partition has been zeroed since it's a member of as_namespace. | 
|---|
| 72 | // Set non-zero members. | 
|---|
| 73 |  | 
|---|
| 74 | cf_mutex_init(&p->lock); | 
|---|
| 75 |  | 
|---|
| 76 | p->id = pid; | 
|---|
| 77 |  | 
|---|
| 78 | if (ns->cold_start) { | 
|---|
| 79 | return; // trees are created later when we know which ones we own | 
|---|
| 80 | } | 
|---|
| 81 |  | 
|---|
| 82 | p->tree = as_index_tree_resume(&ns->tree_shared, ns->xmem_trees, pid, | 
|---|
| 83 | as_partition_tree_done, (void*)p); | 
|---|
| 84 | } | 
|---|
| 85 |  | 
|---|
| 86 | void | 
|---|
| 87 | as_partition_shutdown(as_namespace* ns, uint32_t pid) | 
|---|
| 88 | { | 
|---|
| 89 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 90 |  | 
|---|
| 91 | while (true) { | 
|---|
| 92 | cf_mutex_lock(&p->lock); | 
|---|
| 93 |  | 
|---|
| 94 | as_index_tree* tree = p->tree; | 
|---|
| 95 | as_index_tree_reserve(tree); | 
|---|
| 96 |  | 
|---|
| 97 | cf_mutex_unlock(&p->lock); | 
|---|
| 98 |  | 
|---|
| 99 | // Must come outside partition lock, since transactions may take | 
|---|
| 100 | // partition lock under the record (sprig) lock. | 
|---|
| 101 | as_index_tree_block(tree); | 
|---|
| 102 |  | 
|---|
| 103 | // If lucky, this remains locked and we complete shutdown. | 
|---|
| 104 | cf_mutex_lock(&p->lock); | 
|---|
| 105 |  | 
|---|
| 106 | if (tree == p->tree) { | 
|---|
| 107 | break; // lucky - same tree we blocked | 
|---|
| 108 | } | 
|---|
| 109 |  | 
|---|
| 110 | // Bad luck - blocked a tree that just got switched, block the new one. | 
|---|
| 111 | cf_mutex_unlock(&p->lock); | 
|---|
| 112 | } | 
|---|
| 113 | } | 
|---|
| 114 |  | 
|---|
| 115 | void | 
|---|
| 116 | as_partition_freeze(as_partition* p) | 
|---|
| 117 | { | 
|---|
| 118 | p->working_master = (cf_node)0; | 
|---|
| 119 |  | 
|---|
| 120 | p->n_nodes = 0; | 
|---|
| 121 | p->n_replicas = 0; | 
|---|
| 122 | p->n_dupl = 0; | 
|---|
| 123 |  | 
|---|
| 124 | p->pending_emigrations = 0; | 
|---|
| 125 | p->pending_lead_emigrations = 0; | 
|---|
| 126 | p->pending_immigrations = 0; | 
|---|
| 127 |  | 
|---|
| 128 | p->n_witnesses = 0; | 
|---|
| 129 | } | 
|---|
| 130 |  | 
|---|
| 131 | // Get a list of all nodes (excluding self) that are replicas for a specified | 
|---|
| 132 | // partition: place the list in *nv and return the number of nodes found. | 
|---|
| 133 | uint32_t | 
|---|
| 134 | as_partition_get_other_replicas(as_partition* p, cf_node* nv) | 
|---|
| 135 | { | 
|---|
| 136 | uint32_t n_other_replicas = 0; | 
|---|
| 137 |  | 
|---|
| 138 | cf_mutex_lock(&p->lock); | 
|---|
| 139 |  | 
|---|
| 140 | for (uint32_t repl_ix = 0; repl_ix < p->n_replicas; repl_ix++) { | 
|---|
| 141 | // Don't ever include yourself. | 
|---|
| 142 | if (p->replicas[repl_ix] == g_config.self_node) { | 
|---|
| 143 | continue; | 
|---|
| 144 | } | 
|---|
| 145 |  | 
|---|
| 146 | // Copy the node ID into the user-supplied vector. | 
|---|
| 147 | nv[n_other_replicas++] = p->replicas[repl_ix]; | 
|---|
| 148 | } | 
|---|
| 149 |  | 
|---|
| 150 | cf_mutex_unlock(&p->lock); | 
|---|
| 151 |  | 
|---|
| 152 | return n_other_replicas; | 
|---|
| 153 | } | 
|---|
| 154 |  | 
|---|
| 155 | cf_node | 
|---|
| 156 | as_partition_writable_node(as_namespace* ns, uint32_t pid) | 
|---|
| 157 | { | 
|---|
| 158 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 159 |  | 
|---|
| 160 | cf_mutex_lock(&p->lock); | 
|---|
| 161 |  | 
|---|
| 162 | if (p->n_replicas == 0) { | 
|---|
| 163 | // This partition is unavailable. | 
|---|
| 164 | cf_mutex_unlock(&p->lock); | 
|---|
| 165 | return (cf_node)0; | 
|---|
| 166 | } | 
|---|
| 167 |  | 
|---|
| 168 | cf_node best_node = find_best_node(p, false); | 
|---|
| 169 |  | 
|---|
| 170 | cf_mutex_unlock(&p->lock); | 
|---|
| 171 |  | 
|---|
| 172 | return best_node; | 
|---|
| 173 | } | 
|---|
| 174 |  | 
|---|
| 175 | // If this node is an eventual master, return the acting master, else return 0. | 
|---|
| 176 | cf_node | 
|---|
| 177 | as_partition_proxyee_redirect(as_namespace* ns, uint32_t pid) | 
|---|
| 178 | { | 
|---|
| 179 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 180 |  | 
|---|
| 181 | cf_mutex_lock(&p->lock); | 
|---|
| 182 |  | 
|---|
| 183 | cf_node node = (cf_node)0; | 
|---|
| 184 |  | 
|---|
| 185 | if (g_config.self_node == p->replicas[0] && | 
|---|
| 186 | g_config.self_node != p->working_master) { | 
|---|
| 187 | node = p->working_master; | 
|---|
| 188 | } | 
|---|
| 189 |  | 
|---|
| 190 | cf_mutex_unlock(&p->lock); | 
|---|
| 191 |  | 
|---|
| 192 | return node; | 
|---|
| 193 | } | 
|---|
| 194 |  | 
|---|
| 195 | void | 
|---|
| 196 | as_partition_get_replicas_master_str(cf_dyn_buf* db) | 
|---|
| 197 | { | 
|---|
| 198 | size_t db_sz = db->used_sz; | 
|---|
| 199 |  | 
|---|
| 200 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { | 
|---|
| 201 | as_namespace* ns = g_config.namespaces[ns_ix]; | 
|---|
| 202 |  | 
|---|
| 203 | cf_dyn_buf_append_string(db, ns->name); | 
|---|
| 204 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 205 | cf_dyn_buf_append_buf(db, (uint8_t*)ns->replica_maps[0].b64map, | 
|---|
| 206 | sizeof(ns->replica_maps[0].b64map)); | 
|---|
| 207 | cf_dyn_buf_append_char(db, ';'); | 
|---|
| 208 | } | 
|---|
| 209 |  | 
|---|
| 210 | if (db_sz != db->used_sz) { | 
|---|
| 211 | cf_dyn_buf_chomp(db); | 
|---|
| 212 | } | 
|---|
| 213 | } | 
|---|
| 214 |  | 
|---|
| 215 | void | 
|---|
| 216 | as_partition_get_replicas_all_str(cf_dyn_buf* db, bool include_regime) | 
|---|
| 217 | { | 
|---|
| 218 | size_t db_sz = db->used_sz; | 
|---|
| 219 |  | 
|---|
| 220 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { | 
|---|
| 221 | as_namespace* ns = g_config.namespaces[ns_ix]; | 
|---|
| 222 |  | 
|---|
| 223 | cf_dyn_buf_append_string(db, ns->name); | 
|---|
| 224 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 225 |  | 
|---|
| 226 | if (include_regime) { | 
|---|
| 227 | cf_dyn_buf_append_uint32(db, ns->rebalance_regime); | 
|---|
| 228 | cf_dyn_buf_append_char(db, ','); | 
|---|
| 229 | } | 
|---|
| 230 |  | 
|---|
| 231 | uint32_t repl_factor = ns->replication_factor; | 
|---|
| 232 |  | 
|---|
| 233 | // If we haven't rebalanced yet, report 1 column with no ownership. | 
|---|
| 234 | if (repl_factor == 0) { | 
|---|
| 235 | repl_factor = 1; | 
|---|
| 236 | } | 
|---|
| 237 |  | 
|---|
| 238 | cf_dyn_buf_append_uint32(db, repl_factor); | 
|---|
| 239 |  | 
|---|
| 240 | for (uint32_t repl_ix = 0; repl_ix < repl_factor; repl_ix++) { | 
|---|
| 241 | cf_dyn_buf_append_char(db, ','); | 
|---|
| 242 | cf_dyn_buf_append_buf(db, | 
|---|
| 243 | (uint8_t*)&ns->replica_maps[repl_ix].b64map, | 
|---|
| 244 | sizeof(ns->replica_maps[repl_ix].b64map)); | 
|---|
| 245 | } | 
|---|
| 246 |  | 
|---|
| 247 | cf_dyn_buf_append_char(db, ';'); | 
|---|
| 248 | } | 
|---|
| 249 |  | 
|---|
| 250 | if (db_sz != db->used_sz) { | 
|---|
| 251 | cf_dyn_buf_chomp(db); | 
|---|
| 252 | } | 
|---|
| 253 | } | 
|---|
| 254 |  | 
|---|
| 255 | void | 
|---|
| 256 | as_partition_get_replica_stats(as_namespace* ns, repl_stats* p_stats) | 
|---|
| 257 | { | 
|---|
| 258 | memset(p_stats, 0, sizeof(repl_stats)); | 
|---|
| 259 |  | 
|---|
| 260 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { | 
|---|
| 261 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 262 |  | 
|---|
| 263 | cf_mutex_lock(&p->lock); | 
|---|
| 264 |  | 
|---|
| 265 | if (g_config.self_node == p->working_master) { | 
|---|
| 266 | accumulate_replica_stats(p, | 
|---|
| 267 | &p_stats->n_master_objects, | 
|---|
| 268 | &p_stats->n_master_tombstones); | 
|---|
| 269 | } | 
|---|
| 270 | else if (find_self_in_replicas(p) >= 0) { // -1 if not | 
|---|
| 271 | accumulate_replica_stats(p, | 
|---|
| 272 | &p_stats->n_prole_objects, | 
|---|
| 273 | &p_stats->n_prole_tombstones); | 
|---|
| 274 | } | 
|---|
| 275 | else { | 
|---|
| 276 | accumulate_replica_stats(p, | 
|---|
| 277 | &p_stats->n_non_replica_objects, | 
|---|
| 278 | &p_stats->n_non_replica_tombstones); | 
|---|
| 279 | } | 
|---|
| 280 |  | 
|---|
| 281 | cf_mutex_unlock(&p->lock); | 
|---|
| 282 | } | 
|---|
| 283 | } | 
|---|
| 284 |  | 
|---|
| 285 | // TODO - what if partition is unavailable? | 
|---|
| 286 | void | 
|---|
| 287 | as_partition_reserve(as_namespace* ns, uint32_t pid, | 
|---|
| 288 | as_partition_reservation* rsv) | 
|---|
| 289 | { | 
|---|
| 290 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 291 |  | 
|---|
| 292 | cf_mutex_lock(&p->lock); | 
|---|
| 293 |  | 
|---|
| 294 | partition_reserve_lockfree(p, ns, rsv); | 
|---|
| 295 |  | 
|---|
| 296 | cf_mutex_unlock(&p->lock); | 
|---|
| 297 | } | 
|---|
| 298 |  | 
|---|
| 299 | int | 
|---|
| 300 | as_partition_reserve_replica(as_namespace* ns, uint32_t pid, | 
|---|
| 301 | as_partition_reservation* rsv) | 
|---|
| 302 | { | 
|---|
| 303 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 304 |  | 
|---|
| 305 | cf_mutex_lock(&p->lock); | 
|---|
| 306 |  | 
|---|
| 307 | if (! is_self_replica(p)) { | 
|---|
| 308 | cf_mutex_unlock(&p->lock); | 
|---|
| 309 | return AS_ERR_CLUSTER_KEY_MISMATCH; | 
|---|
| 310 | } | 
|---|
| 311 |  | 
|---|
| 312 | partition_reserve_lockfree(p, ns, rsv); | 
|---|
| 313 |  | 
|---|
| 314 | cf_mutex_unlock(&p->lock); | 
|---|
| 315 |  | 
|---|
| 316 | return AS_OK; | 
|---|
| 317 | } | 
|---|
| 318 |  | 
|---|
| 319 | // Returns: | 
|---|
| 320 | //  0 - reserved - node parameter returns self node | 
|---|
| 321 | // -1 - not reserved - node parameter returns other "better" node | 
|---|
| 322 | // -2 - not reserved - node parameter not filled - partition is unavailable | 
|---|
| 323 | int | 
|---|
| 324 | as_partition_reserve_write(as_namespace* ns, uint32_t pid, | 
|---|
| 325 | as_partition_reservation* rsv, cf_node* node) | 
|---|
| 326 | { | 
|---|
| 327 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 328 |  | 
|---|
| 329 | cf_mutex_lock(&p->lock); | 
|---|
| 330 |  | 
|---|
| 331 | // If this partition is frozen, return. | 
|---|
| 332 | if (p->n_replicas == 0) { | 
|---|
| 333 | if (node) { | 
|---|
| 334 | *node = (cf_node)0; | 
|---|
| 335 | } | 
|---|
| 336 |  | 
|---|
| 337 | cf_mutex_unlock(&p->lock); | 
|---|
| 338 | return -2; | 
|---|
| 339 | } | 
|---|
| 340 |  | 
|---|
| 341 | cf_node best_node = find_best_node(p, false); | 
|---|
| 342 |  | 
|---|
| 343 | if (node) { | 
|---|
| 344 | *node = best_node; | 
|---|
| 345 | } | 
|---|
| 346 |  | 
|---|
| 347 | // If this node is not the appropriate one, return. | 
|---|
| 348 | if (best_node != g_config.self_node) { | 
|---|
| 349 | cf_mutex_unlock(&p->lock); | 
|---|
| 350 | return -1; | 
|---|
| 351 | } | 
|---|
| 352 |  | 
|---|
| 353 | partition_reserve_lockfree(p, ns, rsv); | 
|---|
| 354 |  | 
|---|
| 355 | cf_mutex_unlock(&p->lock); | 
|---|
| 356 |  | 
|---|
| 357 | return 0; | 
|---|
| 358 | } | 
|---|
| 359 |  | 
|---|
| 360 | // Returns: | 
|---|
| 361 | //  0 - reserved - node parameter returns self node | 
|---|
| 362 | // -1 - not reserved - node parameter returns other "better" node | 
|---|
| 363 | // -2 - not reserved - node parameter not filled - partition is unavailable | 
|---|
| 364 | int | 
|---|
| 365 | as_partition_reserve_read_tr(as_namespace* ns, uint32_t pid, as_transaction* tr, | 
|---|
| 366 | cf_node* node) | 
|---|
| 367 | { | 
|---|
| 368 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 369 |  | 
|---|
| 370 | cf_mutex_lock(&p->lock); | 
|---|
| 371 |  | 
|---|
| 372 | // Handle unavailable partition. | 
|---|
| 373 | if (p->n_replicas == 0) { | 
|---|
| 374 | int result = partition_reserve_unavailable(ns, p, tr, node); | 
|---|
| 375 |  | 
|---|
| 376 | if (result == 0) { | 
|---|
| 377 | partition_reserve_lockfree(p, ns, &tr->rsv); | 
|---|
| 378 | } | 
|---|
| 379 |  | 
|---|
| 380 | cf_mutex_unlock(&p->lock); | 
|---|
| 381 | return result; | 
|---|
| 382 | } | 
|---|
| 383 |  | 
|---|
| 384 | cf_node best_node = find_best_node(p, | 
|---|
| 385 | ! partition_reserve_promote(ns, p, tr)); | 
|---|
| 386 |  | 
|---|
| 387 | if (node) { | 
|---|
| 388 | *node = best_node; | 
|---|
| 389 | } | 
|---|
| 390 |  | 
|---|
| 391 | // If this node is not the appropriate one, return. | 
|---|
| 392 | if (best_node != g_config.self_node) { | 
|---|
| 393 | cf_mutex_unlock(&p->lock); | 
|---|
| 394 | return -1; | 
|---|
| 395 | } | 
|---|
| 396 |  | 
|---|
| 397 | partition_reserve_lockfree(p, ns, &tr->rsv); | 
|---|
| 398 |  | 
|---|
| 399 | cf_mutex_unlock(&p->lock); | 
|---|
| 400 |  | 
|---|
| 401 | return 0; | 
|---|
| 402 | } | 
|---|
| 403 |  | 
|---|
| 404 | // Reserves all query-able partitions. | 
|---|
| 405 | // Returns the number of partitions reserved. | 
|---|
| 406 | int | 
|---|
| 407 | as_partition_prereserve_query(as_namespace* ns, bool can_partition_query[], | 
|---|
| 408 | as_partition_reservation rsv[]) | 
|---|
| 409 | { | 
|---|
| 410 | int reserved = 0; | 
|---|
| 411 |  | 
|---|
| 412 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { | 
|---|
| 413 | if (as_partition_reserve_query(ns, pid, &rsv[pid])) { | 
|---|
| 414 | can_partition_query[pid] = false; | 
|---|
| 415 | } | 
|---|
| 416 | else { | 
|---|
| 417 | can_partition_query[pid] = true; | 
|---|
| 418 | reserved++; | 
|---|
| 419 | } | 
|---|
| 420 | } | 
|---|
| 421 |  | 
|---|
| 422 | return reserved; | 
|---|
| 423 | } | 
|---|
| 424 |  | 
|---|
| 425 | // Reserve a partition for query. | 
|---|
| 426 | // Return value 0 means the reservation was taken, -1 means not. | 
|---|
| 427 | int | 
|---|
| 428 | as_partition_reserve_query(as_namespace* ns, uint32_t pid, | 
|---|
| 429 | as_partition_reservation* rsv) | 
|---|
| 430 | { | 
|---|
| 431 | return as_partition_reserve_write(ns, pid, rsv, NULL); | 
|---|
| 432 | } | 
|---|
| 433 |  | 
|---|
| 434 | // Succeeds if we are full working master or prole. | 
|---|
| 435 | int | 
|---|
| 436 | as_partition_reserve_xdr_read(as_namespace* ns, uint32_t pid, | 
|---|
| 437 | as_partition_reservation* rsv) | 
|---|
| 438 | { | 
|---|
| 439 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 440 |  | 
|---|
| 441 | cf_mutex_lock(&p->lock); | 
|---|
| 442 |  | 
|---|
| 443 | int res = -1; | 
|---|
| 444 |  | 
|---|
| 445 | if (p->pending_immigrations == 0 && | 
|---|
| 446 | (g_config.self_node == p->working_master || | 
|---|
| 447 | find_self_in_replicas(p) >= 0)) { | 
|---|
| 448 | partition_reserve_lockfree(p, ns, rsv); | 
|---|
| 449 | res = 0; | 
|---|
| 450 | } | 
|---|
| 451 |  | 
|---|
| 452 | cf_mutex_unlock(&p->lock); | 
|---|
| 453 |  | 
|---|
| 454 | return res; | 
|---|
| 455 | } | 
|---|
| 456 |  | 
|---|
| 457 | void | 
|---|
| 458 | as_partition_reservation_copy(as_partition_reservation* dst, | 
|---|
| 459 | as_partition_reservation* src) | 
|---|
| 460 | { | 
|---|
| 461 | dst->ns = src->ns; | 
|---|
| 462 | dst->p = src->p; | 
|---|
| 463 | dst->tree = src->tree; | 
|---|
| 464 | dst->regime = src->regime; | 
|---|
| 465 | dst->n_dupl = src->n_dupl; | 
|---|
| 466 |  | 
|---|
| 467 | if (dst->n_dupl != 0) { | 
|---|
| 468 | memcpy(dst->dupl_nodes, src->dupl_nodes, sizeof(cf_node) * dst->n_dupl); | 
|---|
| 469 | } | 
|---|
| 470 | } | 
|---|
| 471 |  | 
|---|
| 472 | void | 
|---|
| 473 | as_partition_release(as_partition_reservation* rsv) | 
|---|
| 474 | { | 
|---|
| 475 | as_index_tree_release(rsv->tree); | 
|---|
| 476 | } | 
|---|
| 477 |  | 
|---|
| 478 | void | 
|---|
| 479 | as_partition_advance_tree_id(as_partition* p, const char* ns_name) | 
|---|
| 480 | { | 
|---|
| 481 | uint32_t n_hanging; | 
|---|
| 482 |  | 
|---|
| 483 | // Find first available tree-id past current one. Should be very next one. | 
|---|
| 484 | for (n_hanging = 0; n_hanging < MAX_NUM_TREE_IDS; n_hanging++) { | 
|---|
| 485 | p->tree_id = (p->tree_id + 1) & TREE_ID_MASK; | 
|---|
| 486 |  | 
|---|
| 487 | uint64_t id_mask = 1UL << p->tree_id; | 
|---|
| 488 |  | 
|---|
| 489 | if ((p->tree_ids_used & id_mask) == 0) { | 
|---|
| 490 | // Claim tree-id. Claim is relinquished when tree is destroyed. | 
|---|
| 491 | p->tree_ids_used |= id_mask; | 
|---|
| 492 | break; | 
|---|
| 493 | } | 
|---|
| 494 | } | 
|---|
| 495 |  | 
|---|
| 496 | // If no available tree-ids, just stop. Should never happen. | 
|---|
| 497 | if (n_hanging == MAX_NUM_TREE_IDS) { | 
|---|
| 498 | cf_crash(AS_PARTITION, "{%s} pid %u has %u dropped trees hanging", | 
|---|
| 499 | ns_name, p->id, n_hanging); | 
|---|
| 500 | } | 
|---|
| 501 |  | 
|---|
| 502 | // Too many hanging trees - ref-count leak? Offer chance to warm restart. | 
|---|
| 503 | if (n_hanging > MAX_NUM_TREE_IDS / 2) { | 
|---|
| 504 | cf_warning(AS_PARTITION, "{%s} pid %u has %u dropped trees hanging", | 
|---|
| 505 | ns_name, p->id, n_hanging); | 
|---|
| 506 | } | 
|---|
| 507 | } | 
|---|
| 508 |  | 
|---|
| 509 | // Callback made when dropped as_index_tree is finally destroyed. | 
|---|
| 510 | void | 
|---|
| 511 | as_partition_tree_done(uint8_t id, void* udata) | 
|---|
| 512 | { | 
|---|
| 513 | as_partition* p = (as_partition*)udata; | 
|---|
| 514 |  | 
|---|
| 515 | cf_mutex_lock(&p->lock); | 
|---|
| 516 |  | 
|---|
| 517 | // Relinquish tree-id. | 
|---|
| 518 | p->tree_ids_used &= ~(1UL << id); | 
|---|
| 519 |  | 
|---|
| 520 | cf_mutex_unlock(&p->lock); | 
|---|
| 521 | } | 
|---|
| 522 |  | 
|---|
| 523 | void | 
|---|
| 524 | as_partition_getinfo_str(cf_dyn_buf* db) | 
|---|
| 525 | { | 
|---|
| 526 | size_t db_sz = db->used_sz; | 
|---|
| 527 |  | 
|---|
| 528 | cf_dyn_buf_append_string(db, "namespace:partition:state:n_replicas:replica:" | 
|---|
| 529 | "n_dupl:working_master:emigrates:lead_emigrates:immigrates:records:" | 
|---|
| 530 | "tombstones:regime:version:final_version;"); | 
|---|
| 531 |  | 
|---|
| 532 | for (uint32_t ns_ix = 0; ns_ix < g_config.n_namespaces; ns_ix++) { | 
|---|
| 533 | as_namespace* ns = g_config.namespaces[ns_ix]; | 
|---|
| 534 |  | 
|---|
| 535 | for (uint32_t pid = 0; pid < AS_PARTITIONS; pid++) { | 
|---|
| 536 | as_partition* p = &ns->partitions[pid]; | 
|---|
| 537 |  | 
|---|
| 538 | cf_mutex_lock(&p->lock); | 
|---|
| 539 |  | 
|---|
| 540 | cf_dyn_buf_append_string(db, ns->name); | 
|---|
| 541 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 542 | cf_dyn_buf_append_uint32(db, pid); | 
|---|
| 543 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 544 | cf_dyn_buf_append_char(db, partition_descriptor(p)); | 
|---|
| 545 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 546 | cf_dyn_buf_append_uint32(db, p->n_replicas); | 
|---|
| 547 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 548 | cf_dyn_buf_append_int(db, find_self_in_replicas(p)); | 
|---|
| 549 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 550 | cf_dyn_buf_append_uint32(db, p->n_dupl); | 
|---|
| 551 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 552 | cf_dyn_buf_append_uint64_x(db, p->working_master); | 
|---|
| 553 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 554 | cf_dyn_buf_append_uint32(db, p->pending_emigrations); | 
|---|
| 555 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 556 | cf_dyn_buf_append_uint32(db, p->pending_lead_emigrations); | 
|---|
| 557 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 558 | cf_dyn_buf_append_uint32(db, p->pending_immigrations); | 
|---|
| 559 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 560 | cf_dyn_buf_append_uint32(db, as_index_tree_size(p->tree)); | 
|---|
| 561 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 562 | cf_dyn_buf_append_uint64(db, p->n_tombstones); | 
|---|
| 563 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 564 | cf_dyn_buf_append_uint32(db, p->regime); | 
|---|
| 565 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 566 | cf_dyn_buf_append_string(db, VERSION_AS_STRING(&p->version)); | 
|---|
| 567 | cf_dyn_buf_append_char(db, ':'); | 
|---|
| 568 | cf_dyn_buf_append_string(db, VERSION_AS_STRING(&p->final_version)); | 
|---|
| 569 |  | 
|---|
| 570 | cf_dyn_buf_append_char(db, ';'); | 
|---|
| 571 |  | 
|---|
| 572 | cf_mutex_unlock(&p->lock); | 
|---|
| 573 | } | 
|---|
| 574 | } | 
|---|
| 575 |  | 
|---|
| 576 | if (db_sz != db->used_sz) { | 
|---|
| 577 | cf_dyn_buf_chomp(db); // take back the final ';' | 
|---|
| 578 | } | 
|---|
| 579 | } | 
|---|
| 580 |  | 
|---|
| 581 |  | 
|---|
| 582 | //========================================================== | 
|---|
| 583 | // Public API - client view replica maps. | 
|---|
| 584 | // | 
|---|
| 585 |  | 
|---|
| 586 | void | 
|---|
| 587 | client_replica_maps_create(as_namespace* ns) | 
|---|
| 588 | { | 
|---|
| 589 | uint32_t size = sizeof(client_replica_map) * ns->cfg_replication_factor; | 
|---|
| 590 |  | 
|---|
| 591 | ns->replica_maps = cf_malloc(size); | 
|---|
| 592 | memset(ns->replica_maps, 0, size); | 
|---|
| 593 |  | 
|---|
| 594 | for (uint32_t repl_ix = 0; repl_ix < ns->cfg_replication_factor; | 
|---|
| 595 | repl_ix++) { | 
|---|
| 596 | client_replica_map* repl_map = &ns->replica_maps[repl_ix]; | 
|---|
| 597 |  | 
|---|
| 598 | cf_mutex_init(&repl_map->write_lock); | 
|---|
| 599 |  | 
|---|
| 600 | cf_b64_encode((uint8_t*)repl_map->bitmap, | 
|---|
| 601 | (uint32_t)sizeof(repl_map->bitmap), (char*)repl_map->b64map); | 
|---|
| 602 | } | 
|---|
| 603 | } | 
|---|
| 604 |  | 
|---|
| 605 | void | 
|---|
| 606 | client_replica_maps_clear(as_namespace* ns) | 
|---|
| 607 | { | 
|---|
| 608 | memset(ns->replica_maps, 0, | 
|---|
| 609 | sizeof(client_replica_map) * ns->cfg_replication_factor); | 
|---|
| 610 |  | 
|---|
| 611 | for (uint32_t repl_ix = 0; repl_ix < ns->cfg_replication_factor; | 
|---|
| 612 | repl_ix++) { | 
|---|
| 613 | client_replica_map* repl_map = &ns->replica_maps[repl_ix]; | 
|---|
| 614 |  | 
|---|
| 615 | cf_b64_encode((uint8_t*)repl_map->bitmap, | 
|---|
| 616 | (uint32_t)sizeof(repl_map->bitmap), (char*)repl_map->b64map); | 
|---|
| 617 | } | 
|---|
| 618 | } | 
|---|
| 619 |  | 
|---|
| 620 | bool | 
|---|
| 621 | client_replica_maps_update(as_namespace* ns, uint32_t pid) | 
|---|
| 622 | { | 
|---|
| 623 | uint32_t byte_i = pid >> 3; | 
|---|
| 624 | uint32_t byte_chunk = (byte_i / 3); | 
|---|
| 625 | uint32_t chunk_bitmap_offset = byte_chunk * 3; | 
|---|
| 626 | uint32_t chunk_b64map_offset = byte_chunk << 2; | 
|---|
| 627 |  | 
|---|
| 628 | uint32_t bytes_from_end = CLIENT_BITMAP_BYTES - chunk_bitmap_offset; | 
|---|
| 629 | uint32_t input_size = bytes_from_end > 3 ? 3 : bytes_from_end; | 
|---|
| 630 |  | 
|---|
| 631 | int replica = partition_get_replica_self_lockfree(ns, pid); // -1 if not | 
|---|
| 632 | uint8_t set_mask = 0x80 >> (pid & 0x7); | 
|---|
| 633 | bool changed = false; | 
|---|
| 634 |  | 
|---|
| 635 | for (int repl_ix = 0; repl_ix < (int)ns->cfg_replication_factor; | 
|---|
| 636 | repl_ix++) { | 
|---|
| 637 | client_replica_map* repl_map = &ns->replica_maps[repl_ix]; | 
|---|
| 638 |  | 
|---|
| 639 | volatile uint8_t* mbyte = repl_map->bitmap + byte_i; | 
|---|
| 640 |  | 
|---|
| 641 | bool owned = replica == repl_ix || | 
|---|
| 642 | // Working master also owns all immigrating prole columns, and | 
|---|
| 643 | // if it's an acting master in a prole column, that column (e.g. | 
|---|
| 644 | // full, and migrating to newly added master). | 
|---|
| 645 | (replica == 0 && // am working master | 
|---|
| 646 | should_working_master_own(ns, pid, repl_ix)); | 
|---|
| 647 |  | 
|---|
| 648 | bool is_set = (*mbyte & set_mask) != 0; | 
|---|
| 649 | bool needs_update = (owned && ! is_set) || (! owned && is_set); | 
|---|
| 650 |  | 
|---|
| 651 | if (! needs_update) { | 
|---|
| 652 | continue; | 
|---|
| 653 | } | 
|---|
| 654 |  | 
|---|
| 655 | volatile uint8_t* bitmap_chunk = repl_map->bitmap + chunk_bitmap_offset; | 
|---|
| 656 | volatile char* b64map_chunk = repl_map->b64map + chunk_b64map_offset; | 
|---|
| 657 |  | 
|---|
| 658 | cf_mutex_lock(&repl_map->write_lock); | 
|---|
| 659 |  | 
|---|
| 660 | *mbyte ^= set_mask; | 
|---|
| 661 | cf_b64_encode((uint8_t*)bitmap_chunk, input_size, (char*)b64map_chunk); | 
|---|
| 662 |  | 
|---|
| 663 | cf_mutex_unlock(&repl_map->write_lock); | 
|---|
| 664 |  | 
|---|
| 665 | changed = true; | 
|---|
| 666 | } | 
|---|
| 667 |  | 
|---|
| 668 | return changed; | 
|---|
| 669 | } | 
|---|
| 670 |  | 
|---|
| 671 | bool | 
|---|
| 672 | client_replica_maps_is_partition_queryable(const as_namespace* ns, uint32_t pid) | 
|---|
| 673 | { | 
|---|
| 674 | uint32_t byte_i = pid >> 3; | 
|---|
| 675 |  | 
|---|
| 676 | const client_replica_map* repl_map = ns->replica_maps; | 
|---|
| 677 | const volatile uint8_t* mbyte = repl_map->bitmap + byte_i; | 
|---|
| 678 |  | 
|---|
| 679 | uint8_t set_mask = 0x80 >> (pid & 0x7); | 
|---|
| 680 |  | 
|---|
| 681 | return (*mbyte & set_mask) != 0; | 
|---|
| 682 | } | 
|---|
| 683 |  | 
|---|
| 684 |  | 
|---|
| 685 | //========================================================== | 
|---|
| 686 | // Local helpers. | 
|---|
| 687 | // | 
|---|
| 688 |  | 
|---|
| 689 | // Find best node to handle read/write. Called within partition lock. | 
|---|
| 690 | cf_node | 
|---|
| 691 | find_best_node(const as_partition* p, bool is_read) | 
|---|
| 692 | { | 
|---|
| 693 | // Working master (final or acting) returns self, eventual master returns | 
|---|
| 694 | // acting master. Others don't have p->working_master set. | 
|---|
| 695 | if (p->working_master != (cf_node)0) { | 
|---|
| 696 | return p->working_master; | 
|---|
| 697 | } | 
|---|
| 698 |  | 
|---|
| 699 | if (is_read && p->pending_immigrations == 0 && | 
|---|
| 700 | find_self_in_replicas(p) > 0) { | 
|---|
| 701 | return g_config.self_node; // may read from prole that's got everything | 
|---|
| 702 | } | 
|---|
| 703 |  | 
|---|
| 704 | return p->replicas[0]; // final master as a last resort | 
|---|
| 705 | } | 
|---|
| 706 |  | 
|---|
| 707 | void | 
|---|
| 708 | accumulate_replica_stats(const as_partition* p, uint64_t* p_n_objects, | 
|---|
| 709 | uint64_t* p_n_tombstones) | 
|---|
| 710 | { | 
|---|
| 711 | int64_t n_tombstones = (int64_t)p->n_tombstones; | 
|---|
| 712 | int64_t n_objects = (int64_t)as_index_tree_size(p->tree) - n_tombstones; | 
|---|
| 713 |  | 
|---|
| 714 | *p_n_objects += n_objects > 0 ? (uint64_t)n_objects : 0; | 
|---|
| 715 | *p_n_tombstones += (uint64_t)n_tombstones; | 
|---|
| 716 | } | 
|---|
| 717 |  | 
|---|
| 718 | void | 
|---|
| 719 | partition_reserve_lockfree(as_partition* p, as_namespace* ns, | 
|---|
| 720 | as_partition_reservation* rsv) | 
|---|
| 721 | { | 
|---|
| 722 | as_index_tree_reserve(p->tree); | 
|---|
| 723 |  | 
|---|
| 724 | rsv->ns = ns; | 
|---|
| 725 | rsv->p = p; | 
|---|
| 726 | rsv->tree = p->tree; | 
|---|
| 727 | rsv->regime = p->regime; | 
|---|
| 728 | rsv->n_dupl = p->n_dupl; | 
|---|
| 729 |  | 
|---|
| 730 | if (rsv->n_dupl != 0) { | 
|---|
| 731 | memcpy(rsv->dupl_nodes, p->dupls, sizeof(cf_node) * rsv->n_dupl); | 
|---|
| 732 | } | 
|---|
| 733 | } | 
|---|
| 734 |  | 
|---|
| 735 | char | 
|---|
| 736 | partition_descriptor(const as_partition* p) | 
|---|
| 737 | { | 
|---|
| 738 | if (find_self_in_replicas(p) >= 0) { // -1 if not | 
|---|
| 739 | return p->pending_immigrations == 0 ? 'S' : 'D'; | 
|---|
| 740 | } | 
|---|
| 741 |  | 
|---|
| 742 | if (as_partition_version_is_null(&p->version)) { | 
|---|
| 743 | return 'A'; | 
|---|
| 744 | } | 
|---|
| 745 |  | 
|---|
| 746 | return as_partition_version_has_data(&p->version) ? 'Z' : 'X'; | 
|---|
| 747 | } | 
|---|
| 748 |  | 
|---|
| 749 | int | 
|---|
| 750 | partition_get_replica_self_lockfree(const as_namespace* ns, uint32_t pid) | 
|---|
| 751 | { | 
|---|
| 752 | const as_partition* p = &ns->partitions[pid]; | 
|---|
| 753 |  | 
|---|
| 754 | if (g_config.self_node == p->working_master) { | 
|---|
| 755 | return 0; | 
|---|
| 756 | } | 
|---|
| 757 |  | 
|---|
| 758 | int self_n = find_self_in_replicas(p); // -1 if not | 
|---|
| 759 |  | 
|---|
| 760 | if (self_n > 0 && p->pending_immigrations == 0 && | 
|---|
| 761 | // Check self_n < n_repl only because n_repl could be out-of-sync | 
|---|
| 762 | // with (less than) partition's replica list count. | 
|---|
| 763 | self_n < (int)ns->replication_factor) { | 
|---|
| 764 | return self_n; | 
|---|
| 765 | } | 
|---|
| 766 |  | 
|---|
| 767 | return -1; // not a replica | 
|---|
| 768 | } | 
|---|
| 769 |  | 
|---|
| 770 | bool | 
|---|
| 771 | should_working_master_own(const as_namespace* ns, uint32_t pid, | 
|---|
| 772 | uint32_t repl_ix) | 
|---|
| 773 | { | 
|---|
| 774 | const as_partition* p = &ns->partitions[pid]; | 
|---|
| 775 |  | 
|---|
| 776 | return find_self_in_replicas(p) == (int)repl_ix || p->immigrators[repl_ix]; | 
|---|
| 777 | } | 
|---|
| 778 |  | 
|---|