| 1 | /* |
| 2 | * proxy.c |
| 3 | * |
| 4 | * Copyright (C) 2016 Aerospike, Inc. |
| 5 | * |
| 6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
| 7 | * license agreements. |
| 8 | * |
| 9 | * This program is free software: you can redistribute it and/or modify it under |
| 10 | * the terms of the GNU Affero General Public License as published by the Free |
| 11 | * Software Foundation, either version 3 of the License, or (at your option) any |
| 12 | * later version. |
| 13 | * |
| 14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
| 15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
| 17 | * details. |
| 18 | * |
| 19 | * You should have received a copy of the GNU Affero General Public License |
| 20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
| 21 | */ |
| 22 | |
| 23 | //========================================================== |
| 24 | // Includes. |
| 25 | // |
| 26 | |
| 27 | #include "transaction/proxy.h" |
| 28 | |
| 29 | #include <errno.h> |
| 30 | #include <stdbool.h> |
| 31 | #include <stddef.h> |
| 32 | #include <stdint.h> |
| 33 | #include <unistd.h> |
| 34 | |
| 35 | #include "citrusleaf/alloc.h" |
| 36 | #include "citrusleaf/cf_atomic.h" |
| 37 | #include "citrusleaf/cf_clock.h" |
| 38 | #include "citrusleaf/cf_digest.h" |
| 39 | |
| 40 | #include "cf_mutex.h" |
| 41 | #include "cf_thread.h" |
| 42 | #include "dynbuf.h" |
| 43 | #include "fault.h" |
| 44 | #include "msg.h" |
| 45 | #include "node.h" |
| 46 | #include "shash.h" |
| 47 | #include "socket.h" |
| 48 | |
| 49 | #include "base/batch.h" |
| 50 | #include "base/datamodel.h" |
| 51 | #include "base/health.h" |
| 52 | #include "base/proto.h" |
| 53 | #include "base/service.h" |
| 54 | #include "base/stats.h" |
| 55 | #include "base/transaction.h" |
| 56 | #include "fabric/exchange.h" |
| 57 | #include "fabric/fabric.h" |
| 58 | #include "fabric/partition.h" |
| 59 | #include "transaction/rw_request.h" |
| 60 | #include "transaction/rw_request_hash.h" |
| 61 | #include "transaction/rw_utils.h" |
| 62 | #include "transaction/udf.h" |
| 63 | |
| 64 | |
| 65 | //========================================================== |
| 66 | // Typedefs & constants. |
| 67 | // |
| 68 | |
| 69 | typedef enum { |
| 70 | // These values go on the wire, so mind backward compatibility if changing. |
| 71 | PROXY_FIELD_OP, |
| 72 | PROXY_FIELD_TID, |
| 73 | PROXY_FIELD_DIGEST, |
| 74 | PROXY_FIELD_REDIRECT, |
| 75 | PROXY_FIELD_AS_PROTO, // request as_proto - currently contains only as_msg's |
| 76 | PROXY_FIELD_UNUSED_5, |
| 77 | PROXY_FIELD_UNUSED_6, |
| 78 | PROXY_FIELD_UNUSED_7, |
| 79 | |
| 80 | NUM_PROXY_FIELDS |
| 81 | } proxy_msg_field; |
| 82 | |
| 83 | #define PROXY_OP_REQUEST 1 |
| 84 | #define PROXY_OP_RESPONSE 2 |
| 85 | #define PROXY_OP_RETURN_TO_SENDER 3 |
| 86 | |
| 87 | const msg_template proxy_mt[] = { |
| 88 | { PROXY_FIELD_OP, M_FT_UINT32 }, |
| 89 | { PROXY_FIELD_TID, M_FT_UINT32 }, |
| 90 | { PROXY_FIELD_DIGEST, M_FT_BUF }, |
| 91 | { PROXY_FIELD_REDIRECT, M_FT_UINT64 }, |
| 92 | { PROXY_FIELD_AS_PROTO, M_FT_BUF }, |
| 93 | { PROXY_FIELD_UNUSED_5, M_FT_UINT64 }, |
| 94 | { PROXY_FIELD_UNUSED_6, M_FT_UINT32 }, |
| 95 | { PROXY_FIELD_UNUSED_7, M_FT_UINT32 }, |
| 96 | }; |
| 97 | |
| 98 | COMPILER_ASSERT(sizeof(proxy_mt) / sizeof(msg_template) == NUM_PROXY_FIELDS); |
| 99 | |
| 100 | #define PROXY_MSG_SCRATCH_SIZE 128 |
| 101 | |
| 102 | typedef struct proxy_request_s { |
| 103 | uint32_t msg_fields; |
| 104 | |
| 105 | uint8_t origin; |
| 106 | uint8_t from_flags; |
| 107 | |
| 108 | union { |
| 109 | void* any; |
| 110 | as_file_handle* proto_fd_h; |
| 111 | as_batch_shared* batch_shared; |
| 112 | // No need yet for other members of this union. |
| 113 | } from; |
| 114 | |
| 115 | // No need yet for a 'from_data" union. |
| 116 | uint32_t batch_index; |
| 117 | |
| 118 | uint64_t start_time; |
| 119 | uint64_t end_time; |
| 120 | |
| 121 | // The original proxy message. |
| 122 | msg* fab_msg; |
| 123 | |
| 124 | as_namespace* ns; |
| 125 | } proxy_request; |
| 126 | |
| 127 | |
| 128 | //========================================================== |
| 129 | // Globals. |
| 130 | // |
| 131 | |
| 132 | static cf_shash* g_proxy_hash = NULL; |
| 133 | static cf_atomic32 g_proxy_tid = 0; |
| 134 | |
| 135 | |
| 136 | //========================================================== |
| 137 | // Forward declarations. |
| 138 | // |
| 139 | |
| 140 | void* run_proxy_timeout(void* arg); |
| 141 | int proxy_timeout_reduce_fn(const void* key, void* data, void* udata); |
| 142 | |
| 143 | int proxy_msg_cb(cf_node src, msg* m, void* udata); |
| 144 | |
| 145 | cl_msg* new_msg_w_extra_field(const cl_msg* msgp, const as_msg_field* f); |
| 146 | void proxyer_handle_response(msg* m, uint32_t tid); |
| 147 | int proxyer_handle_client_response(msg* m, proxy_request* pr); |
| 148 | int proxyer_handle_batch_response(msg* m, proxy_request* pr); |
| 149 | void proxyer_handle_return_to_sender(msg* m, uint32_t tid); |
| 150 | |
| 151 | void proxyee_handle_request(cf_node src, msg* m, uint32_t tid); |
| 152 | |
| 153 | |
| 154 | //========================================================== |
| 155 | // Inlines & macros. |
| 156 | // |
| 157 | |
| 158 | static inline void |
| 159 | error_response(cf_node src, uint32_t tid, uint32_t error) |
| 160 | { |
| 161 | as_proxy_send_response(src, tid, error, 0, 0, NULL, NULL, 0, NULL, 0); |
| 162 | } |
| 163 | |
| 164 | static inline void |
| 165 | client_proxy_update_stats(as_namespace* ns, uint8_t result_code) |
| 166 | { |
| 167 | switch (result_code) { |
| 168 | case AS_OK: |
| 169 | cf_atomic64_incr(&ns->n_client_proxy_complete); |
| 170 | break; |
| 171 | case AS_ERR_TIMEOUT: |
| 172 | cf_atomic64_incr(&ns->n_client_proxy_timeout); |
| 173 | break; |
| 174 | default: |
| 175 | cf_atomic64_incr(&ns->n_client_proxy_error); |
| 176 | break; |
| 177 | } |
| 178 | } |
| 179 | |
| 180 | static inline void |
| 181 | batch_sub_proxy_update_stats(as_namespace* ns, uint8_t result_code) |
| 182 | { |
| 183 | switch (result_code) { |
| 184 | case AS_OK: |
| 185 | cf_atomic64_incr(&ns->n_batch_sub_proxy_complete); |
| 186 | break; |
| 187 | case AS_ERR_TIMEOUT: |
| 188 | cf_atomic64_incr(&ns->n_batch_sub_proxy_timeout); |
| 189 | break; |
| 190 | default: |
| 191 | cf_atomic64_incr(&ns->n_batch_sub_proxy_error); |
| 192 | break; |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | |
| 197 | //========================================================== |
| 198 | // Public API. |
| 199 | // |
| 200 | |
| 201 | void |
| 202 | as_proxy_init() |
| 203 | { |
| 204 | g_proxy_hash = cf_shash_create(cf_shash_fn_u32, sizeof(uint32_t), |
| 205 | sizeof(proxy_request), 4 * 1024, CF_SHASH_MANY_LOCK); |
| 206 | |
| 207 | cf_thread_create_detached(run_proxy_timeout, NULL); |
| 208 | |
| 209 | as_fabric_register_msg_fn(M_TYPE_PROXY, proxy_mt, sizeof(proxy_mt), |
| 210 | PROXY_MSG_SCRATCH_SIZE, proxy_msg_cb, NULL); |
| 211 | } |
| 212 | |
| 213 | |
| 214 | uint32_t |
| 215 | as_proxy_hash_count() |
| 216 | { |
| 217 | return cf_shash_get_size(g_proxy_hash); |
| 218 | } |
| 219 | |
| 220 | |
| 221 | // Proxyer - divert a transaction request to another node. |
| 222 | void |
| 223 | as_proxy_divert(cf_node dst, as_transaction* tr, as_namespace* ns) |
| 224 | { |
| 225 | // Special log detail. |
| 226 | switch (tr->origin) { |
| 227 | case FROM_CLIENT: |
| 228 | cf_detail_digest(AS_PROXY_DIVERT, &tr->keyd, |
| 229 | "{%s} diverting from client %s to node %lx " , |
| 230 | ns->name, tr->from.proto_fd_h->client, dst); |
| 231 | break; |
| 232 | case FROM_BATCH: |
| 233 | cf_detail_digest(AS_PROXY_DIVERT, &tr->keyd, |
| 234 | "{%s} diverting batch-sub from client %s to node %lx " , |
| 235 | ns->name, as_batch_get_fd_h(tr->from.batch_shared)->client, |
| 236 | dst); |
| 237 | break; |
| 238 | default: |
| 239 | cf_crash(AS_PROXY, "unexpected transaction origin %u" , tr->origin); |
| 240 | break; |
| 241 | } |
| 242 | |
| 243 | // Get a fabric message and fill it out. |
| 244 | |
| 245 | msg* m = as_fabric_msg_get(M_TYPE_PROXY); |
| 246 | |
| 247 | uint32_t tid = cf_atomic32_incr(&g_proxy_tid); |
| 248 | |
| 249 | msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_REQUEST); |
| 250 | msg_set_uint32(m, PROXY_FIELD_TID, tid); |
| 251 | msg_set_buf(m, PROXY_FIELD_DIGEST, (void*)&tr->keyd, sizeof(cf_digest), |
| 252 | MSG_SET_COPY); |
| 253 | |
| 254 | if (tr->origin == FROM_BATCH) { |
| 255 | as_msg_field* f = as_batch_get_predexp_mf(tr->from.batch_shared); |
| 256 | |
| 257 | if (f == NULL) { |
| 258 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, (void*)tr->msgp, |
| 259 | sizeof(as_proto) + tr->msgp->proto.sz, MSG_SET_COPY); |
| 260 | } |
| 261 | else { |
| 262 | cl_msg* msgp = new_msg_w_extra_field(tr->msgp, f); |
| 263 | |
| 264 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, (void*)msgp, |
| 265 | sizeof(as_proto) + msgp->proto.sz, MSG_SET_HANDOFF_MALLOC); |
| 266 | } |
| 267 | } |
| 268 | else { |
| 269 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, (void*)tr->msgp, |
| 270 | sizeof(as_proto) + tr->msgp->proto.sz, MSG_SET_HANDOFF_MALLOC); |
| 271 | } |
| 272 | |
| 273 | // Set up a proxy_request and insert it in the hash. |
| 274 | |
| 275 | proxy_request pr; |
| 276 | |
| 277 | pr.msg_fields = tr->msg_fields; |
| 278 | |
| 279 | pr.origin = tr->origin; |
| 280 | pr.from_flags = tr->from_flags; |
| 281 | pr.from.any = tr->from.any; |
| 282 | pr.batch_index = tr->from_data.batch_index; |
| 283 | |
| 284 | pr.start_time = tr->start_time; |
| 285 | pr.end_time = tr->end_time; |
| 286 | |
| 287 | pr.fab_msg = m; |
| 288 | |
| 289 | pr.ns = ns; |
| 290 | |
| 291 | msg_incr_ref(m); // reference for the hash |
| 292 | |
| 293 | cf_shash_put(g_proxy_hash, &tid, &pr); |
| 294 | |
| 295 | tr->msgp = NULL; // pattern, not needed |
| 296 | tr->from.any = NULL; // pattern, not needed |
| 297 | |
| 298 | // Send fabric message to remote node. |
| 299 | |
| 300 | if (as_fabric_send(dst, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) { |
| 301 | as_fabric_msg_put(m); |
| 302 | } |
| 303 | |
| 304 | as_health_add_node_counter(dst, AS_HEALTH_NODE_PROXIES); |
| 305 | } |
| 306 | |
| 307 | |
| 308 | // Proxyee - transaction reservation failed here, tell proxyer to try again. |
| 309 | void |
| 310 | as_proxy_return_to_sender(const as_transaction* tr, as_namespace* ns) |
| 311 | { |
| 312 | msg* m = as_fabric_msg_get(M_TYPE_PROXY); |
| 313 | uint32_t pid = as_partition_getid(&tr->keyd); |
| 314 | cf_node redirect_node = as_partition_proxyee_redirect(ns, pid); |
| 315 | |
| 316 | msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_RETURN_TO_SENDER); |
| 317 | msg_set_uint32(m, PROXY_FIELD_TID, tr->from_data.proxy_tid); |
| 318 | msg_set_uint64(m, PROXY_FIELD_REDIRECT, |
| 319 | redirect_node == (cf_node)0 ? tr->from.proxy_node : redirect_node); |
| 320 | |
| 321 | if (as_fabric_send(tr->from.proxy_node, m, AS_FABRIC_CHANNEL_RW) != |
| 322 | AS_FABRIC_SUCCESS) { |
| 323 | as_fabric_msg_put(m); |
| 324 | } |
| 325 | } |
| 326 | |
| 327 | |
| 328 | // Proxyee - transaction completed here, send response to proxyer. |
| 329 | void |
| 330 | as_proxy_send_response(cf_node dst, uint32_t proxy_tid, uint32_t result_code, |
| 331 | uint32_t generation, uint32_t void_time, as_msg_op** ops, as_bin** bins, |
| 332 | uint16_t bin_count, as_namespace* ns, uint64_t trid) |
| 333 | { |
| 334 | msg* m = as_fabric_msg_get(M_TYPE_PROXY); |
| 335 | |
| 336 | msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_RESPONSE); |
| 337 | msg_set_uint32(m, PROXY_FIELD_TID, proxy_tid); |
| 338 | |
| 339 | size_t msg_sz = 0; |
| 340 | uint8_t* msgp = (uint8_t*)as_msg_make_response_msg(result_code, generation, |
| 341 | void_time, ops, bins, bin_count, ns, 0, &msg_sz, trid); |
| 342 | |
| 343 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, msgp, msg_sz, MSG_SET_HANDOFF_MALLOC); |
| 344 | |
| 345 | if (as_fabric_send(dst, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) { |
| 346 | as_fabric_msg_put(m); |
| 347 | } |
| 348 | } |
| 349 | |
| 350 | |
| 351 | // Proxyee - transaction completed here, send response to proxyer. |
| 352 | void |
| 353 | as_proxy_send_ops_response(cf_node dst, uint32_t proxy_tid, cf_dyn_buf* db) |
| 354 | { |
| 355 | msg* m = as_fabric_msg_get(M_TYPE_PROXY); |
| 356 | |
| 357 | msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_RESPONSE); |
| 358 | msg_set_uint32(m, PROXY_FIELD_TID, proxy_tid); |
| 359 | |
| 360 | uint8_t* msgp = db->buf; |
| 361 | size_t msg_sz = db->used_sz; |
| 362 | |
| 363 | if (db->is_stack) { |
| 364 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, msgp, msg_sz, MSG_SET_COPY); |
| 365 | } |
| 366 | else { |
| 367 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, msgp, msg_sz, |
| 368 | MSG_SET_HANDOFF_MALLOC); |
| 369 | db->buf = NULL; // the fabric owns the buffer now |
| 370 | } |
| 371 | |
| 372 | if (as_fabric_send(dst, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) { |
| 373 | as_fabric_msg_put(m); |
| 374 | } |
| 375 | } |
| 376 | |
| 377 | |
| 378 | //========================================================== |
| 379 | // Local helpers - proxyer. |
| 380 | // |
| 381 | |
| 382 | cl_msg* |
| 383 | (const cl_msg* msgp, const as_msg_field* f) |
| 384 | { |
| 385 | size_t old_sz = sizeof(as_proto) + msgp->proto.sz; |
| 386 | size_t = sizeof(f->field_sz) + f->field_sz; |
| 387 | cl_msg* new_msgp = cf_malloc(old_sz + extra_sz); |
| 388 | |
| 389 | memcpy(new_msgp, msgp, old_sz); |
| 390 | memcpy((uint8_t*)new_msgp + old_sz, f, extra_sz); |
| 391 | |
| 392 | new_msgp->proto.sz += extra_sz; |
| 393 | new_msgp->msg.n_fields++; |
| 394 | |
| 395 | return new_msgp; |
| 396 | } |
| 397 | |
| 398 | void |
| 399 | proxyer_handle_response(msg* m, uint32_t tid) |
| 400 | { |
| 401 | proxy_request pr; |
| 402 | |
| 403 | if (cf_shash_get_and_delete(g_proxy_hash, &tid, &pr) != CF_SHASH_OK) { |
| 404 | // Some other response (or timeout) has already finished this pr. |
| 405 | return; |
| 406 | } |
| 407 | |
| 408 | cf_assert(pr.from.any, AS_PROXY, "origin %u has null 'from'" , pr.origin); |
| 409 | |
| 410 | int result; |
| 411 | |
| 412 | switch (pr.origin) { |
| 413 | case FROM_CLIENT: |
| 414 | result = proxyer_handle_client_response(m, &pr); |
| 415 | client_proxy_update_stats(pr.ns, result); |
| 416 | break; |
| 417 | case FROM_BATCH: |
| 418 | result = proxyer_handle_batch_response(m, &pr); |
| 419 | batch_sub_proxy_update_stats(pr.ns, result); |
| 420 | // Note - no worries about msgp, proxy divert copied it. |
| 421 | break; |
| 422 | default: |
| 423 | cf_crash(AS_PROXY, "unexpected transaction origin %u" , pr.origin); |
| 424 | break; |
| 425 | } |
| 426 | |
| 427 | pr.from.any = NULL; // pattern, not needed |
| 428 | |
| 429 | as_fabric_msg_put(pr.fab_msg); |
| 430 | |
| 431 | // Note that this includes both origins. |
| 432 | if (pr.ns->proxy_hist_enabled) { |
| 433 | histogram_insert_data_point(pr.ns->proxy_hist, pr.start_time); |
| 434 | } |
| 435 | } |
| 436 | |
| 437 | |
| 438 | int |
| 439 | proxyer_handle_client_response(msg* m, proxy_request* pr) |
| 440 | { |
| 441 | uint8_t* proto; |
| 442 | size_t proto_sz; |
| 443 | |
| 444 | if (msg_get_buf(m, PROXY_FIELD_AS_PROTO, &proto, &proto_sz, |
| 445 | MSG_GET_DIRECT) != 0) { |
| 446 | cf_warning(AS_PROXY, "msg get for proto failed" ); |
| 447 | return AS_ERR_UNKNOWN; |
| 448 | } |
| 449 | |
| 450 | as_file_handle* fd_h = pr->from.proto_fd_h; |
| 451 | |
| 452 | if (cf_socket_send_all(&fd_h->sock, proto, proto_sz, MSG_NOSIGNAL, |
| 453 | CF_SOCKET_TIMEOUT) < 0) { |
| 454 | // Common when a client aborts. |
| 455 | as_end_of_transaction_force_close(fd_h); |
| 456 | return AS_ERR_UNKNOWN; |
| 457 | } |
| 458 | |
| 459 | as_end_of_transaction_ok(fd_h); |
| 460 | return AS_OK; |
| 461 | } |
| 462 | |
| 463 | |
| 464 | int |
| 465 | proxyer_handle_batch_response(msg* m, proxy_request* pr) |
| 466 | { |
| 467 | cl_msg* msgp; |
| 468 | size_t msgp_sz; |
| 469 | |
| 470 | if (msg_get_buf(m, PROXY_FIELD_AS_PROTO, (uint8_t**)&msgp, &msgp_sz, |
| 471 | MSG_GET_DIRECT) != 0) { |
| 472 | cf_warning(AS_PROXY, "msg get for proto failed" ); |
| 473 | return AS_ERR_UNKNOWN; |
| 474 | } |
| 475 | |
| 476 | cf_digest* keyd; |
| 477 | |
| 478 | if (msg_get_buf(pr->fab_msg, PROXY_FIELD_DIGEST, (uint8_t**)&keyd, NULL, |
| 479 | MSG_GET_DIRECT) != 0) { |
| 480 | cf_crash(AS_PROXY, "original msg get for digest failed" ); |
| 481 | } |
| 482 | |
| 483 | as_batch_add_proxy_result(pr->from.batch_shared, pr->batch_index, keyd, |
| 484 | msgp, msgp_sz); |
| 485 | |
| 486 | return AS_OK; |
| 487 | } |
| 488 | |
| 489 | |
| 490 | void |
| 491 | proxyer_handle_return_to_sender(msg* m, uint32_t tid) |
| 492 | { |
| 493 | proxy_request* pr; |
| 494 | cf_mutex* lock; |
| 495 | |
| 496 | if (cf_shash_get_vlock(g_proxy_hash, &tid, (void**)&pr, &lock) != |
| 497 | CF_SHASH_OK) { |
| 498 | // Some other response (or timeout) has already finished this pr. |
| 499 | return; |
| 500 | } |
| 501 | |
| 502 | cf_node redirect_node; |
| 503 | |
| 504 | if (msg_get_uint64(m, PROXY_FIELD_REDIRECT, &redirect_node) == 0 |
| 505 | && redirect_node != g_config.self_node |
| 506 | && redirect_node != (cf_node)0) { |
| 507 | // If this node was a "random" node, i.e. neither acting nor eventual |
| 508 | // master, it diverts to the eventual master (the best it can do.) The |
| 509 | // eventual master must inform this node about the acting master. |
| 510 | |
| 511 | msg_incr_ref(pr->fab_msg); |
| 512 | |
| 513 | if (as_fabric_send(redirect_node, pr->fab_msg, AS_FABRIC_CHANNEL_RW) != |
| 514 | AS_FABRIC_SUCCESS) { |
| 515 | as_fabric_msg_put(pr->fab_msg); |
| 516 | } |
| 517 | |
| 518 | cf_mutex_unlock(lock); |
| 519 | return; |
| 520 | } |
| 521 | |
| 522 | cf_digest* keyd; |
| 523 | |
| 524 | if (msg_get_buf(pr->fab_msg, PROXY_FIELD_DIGEST, (uint8_t**)&keyd, NULL, |
| 525 | MSG_GET_DIRECT) != 0) { |
| 526 | cf_crash(AS_PROXY, "original msg get for digest failed" ); |
| 527 | } |
| 528 | |
| 529 | cl_msg* msgp; |
| 530 | |
| 531 | // TODO - inefficient! Should be a way to 'take' a buffer from msg. |
| 532 | if (msg_get_buf(pr->fab_msg, PROXY_FIELD_AS_PROTO, (uint8_t**)&msgp, NULL, |
| 533 | MSG_GET_COPY_MALLOC) != 0) { |
| 534 | cf_crash(AS_PROXY, "original msg get for proto failed" ); |
| 535 | } |
| 536 | |
| 537 | // Put the as_msg on the normal queue for processing. |
| 538 | as_transaction tr; |
| 539 | as_transaction_init_head(&tr, keyd, msgp); |
| 540 | // msgp might not have digest - batch sub-transactions, old clients. |
| 541 | // For old clients, will compute it again from msgp key and set. |
| 542 | |
| 543 | tr.msg_fields = pr->msg_fields; |
| 544 | tr.origin = pr->origin; |
| 545 | tr.from_flags = pr->from_flags; |
| 546 | tr.from.any = pr->from.any; |
| 547 | tr.from_data.batch_index = pr->batch_index; |
| 548 | tr.start_time = pr->start_time; |
| 549 | |
| 550 | as_service_enqueue_internal(&tr); |
| 551 | |
| 552 | as_fabric_msg_put(pr->fab_msg); |
| 553 | |
| 554 | cf_shash_delete_lockfree(g_proxy_hash, &tid); |
| 555 | cf_mutex_unlock(lock); |
| 556 | } |
| 557 | |
| 558 | |
| 559 | //========================================================== |
| 560 | // Local helpers - proxyee. |
| 561 | // |
| 562 | |
| 563 | void |
| 564 | proxyee_handle_request(cf_node src, msg* m, uint32_t tid) |
| 565 | { |
| 566 | cf_digest* keyd; |
| 567 | |
| 568 | if (msg_get_buf(m, PROXY_FIELD_DIGEST, (uint8_t**)&keyd, NULL, |
| 569 | MSG_GET_DIRECT) != 0) { |
| 570 | cf_warning(AS_PROXY, "msg get for digest failed" ); |
| 571 | error_response(src, tid, AS_ERR_UNKNOWN); |
| 572 | return; |
| 573 | } |
| 574 | |
| 575 | cl_msg* msgp; |
| 576 | size_t msgp_sz; |
| 577 | |
| 578 | if (msg_get_buf(m, PROXY_FIELD_AS_PROTO, (uint8_t**)&msgp, &msgp_sz, |
| 579 | MSG_GET_COPY_MALLOC) != 0) { |
| 580 | cf_warning(AS_PROXY, "msg get for proto failed" ); |
| 581 | error_response(src, tid, AS_ERR_UNKNOWN); |
| 582 | return; |
| 583 | } |
| 584 | |
| 585 | // Sanity check as_proto fields. |
| 586 | as_proto* proto = &msgp->proto; |
| 587 | |
| 588 | if (! as_proto_wrapped_is_valid(proto, msgp_sz)) { |
| 589 | cf_warning(AS_PROXY, "bad proto: version %u, type %u, sz %lu [%lu]" , |
| 590 | proto->version, proto->type, (uint64_t)proto->sz, msgp_sz); |
| 591 | error_response(src, tid, AS_ERR_UNKNOWN); |
| 592 | return; |
| 593 | } |
| 594 | |
| 595 | // Put the as_msg on the normal queue for processing. |
| 596 | as_transaction tr; |
| 597 | as_transaction_init_head(&tr, keyd, msgp); |
| 598 | // msgp might not have digest - batch sub-transactions, old clients. |
| 599 | // For old clients, will compute it again from msgp key and set. |
| 600 | |
| 601 | tr.start_time = cf_getns(); |
| 602 | |
| 603 | tr.origin = FROM_PROXY; |
| 604 | tr.from.proxy_node = src; |
| 605 | tr.from_data.proxy_tid = tid; |
| 606 | |
| 607 | // Proxyer has already done byte swapping in as_msg. |
| 608 | if (! as_transaction_prepare(&tr, false)) { |
| 609 | cf_warning(AS_PROXY, "bad proxy msg" ); |
| 610 | error_response(src, tid, AS_ERR_UNKNOWN); |
| 611 | return; |
| 612 | } |
| 613 | |
| 614 | // For batch sub-transactions, make sure we flag them so they're not |
| 615 | // mistaken for multi-record transactions (which never proxy). |
| 616 | if (as_transaction_has_no_key_or_digest(&tr)) { |
| 617 | tr.from_flags |= FROM_FLAG_BATCH_SUB; |
| 618 | } |
| 619 | |
| 620 | as_service_enqueue_internal(&tr); |
| 621 | } |
| 622 | |
| 623 | |
| 624 | //========================================================== |
| 625 | // Local helpers - timeout. |
| 626 | // |
| 627 | |
| 628 | void* |
| 629 | run_proxy_timeout(void* arg) |
| 630 | { |
| 631 | while (true) { |
| 632 | usleep(75 * 1000); |
| 633 | |
| 634 | now_times now; |
| 635 | |
| 636 | now.now_ns = cf_getns(); |
| 637 | now.now_ms = now.now_ns / 1000000; |
| 638 | |
| 639 | cf_shash_reduce(g_proxy_hash, proxy_timeout_reduce_fn, &now); |
| 640 | } |
| 641 | |
| 642 | return NULL; |
| 643 | } |
| 644 | |
| 645 | |
| 646 | int |
| 647 | proxy_timeout_reduce_fn(const void* key, void* data, void* udata) |
| 648 | { |
| 649 | proxy_request* pr = data; |
| 650 | now_times* now = (now_times*)udata; |
| 651 | |
| 652 | if (now->now_ns < pr->end_time) { |
| 653 | return CF_SHASH_OK; |
| 654 | } |
| 655 | |
| 656 | // Handle timeouts. |
| 657 | |
| 658 | cf_assert(pr->from.any, AS_PROXY, "origin %u has null 'from'" , pr->origin); |
| 659 | |
| 660 | switch (pr->origin) { |
| 661 | case FROM_CLIENT: |
| 662 | // TODO - when it becomes important enough, find a way to echo trid. |
| 663 | as_msg_send_reply(pr->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL, |
| 664 | 0, pr->ns, 0); |
| 665 | client_proxy_update_stats(pr->ns, AS_ERR_TIMEOUT); |
| 666 | break; |
| 667 | case FROM_BATCH: |
| 668 | as_batch_add_error(pr->from.batch_shared, pr->batch_index, |
| 669 | AS_ERR_TIMEOUT); |
| 670 | // Note - no worries about msgp, proxy divert copied it. |
| 671 | batch_sub_proxy_update_stats(pr->ns, AS_ERR_TIMEOUT); |
| 672 | break; |
| 673 | default: |
| 674 | cf_crash(AS_PROXY, "unexpected transaction origin %u" , pr->origin); |
| 675 | break; |
| 676 | } |
| 677 | |
| 678 | pr->from.any = NULL; // pattern, not needed |
| 679 | as_fabric_msg_put(pr->fab_msg); |
| 680 | |
| 681 | return CF_SHASH_REDUCE_DELETE; |
| 682 | } |
| 683 | |
| 684 | |
| 685 | //========================================================== |
| 686 | // Local helpers - handle PROXY fabric messages. |
| 687 | // |
| 688 | |
| 689 | int |
| 690 | proxy_msg_cb(cf_node src, msg* m, void* udata) |
| 691 | { |
| 692 | uint32_t op; |
| 693 | |
| 694 | if (msg_get_uint32(m, PROXY_FIELD_OP, &op) != 0) { |
| 695 | cf_warning(AS_PROXY, "msg get for op failed" ); |
| 696 | as_fabric_msg_put(m); |
| 697 | return 0; |
| 698 | } |
| 699 | |
| 700 | uint32_t tid; |
| 701 | |
| 702 | if (msg_get_uint32(m, PROXY_FIELD_TID, &tid) != 0) { |
| 703 | cf_warning(AS_PROXY, "msg get for tid failed" ); |
| 704 | as_fabric_msg_put(m); |
| 705 | return 0; |
| 706 | } |
| 707 | |
| 708 | switch (op) { |
| 709 | case PROXY_OP_REQUEST: |
| 710 | proxyee_handle_request(src, m, tid); |
| 711 | break; |
| 712 | case PROXY_OP_RESPONSE: |
| 713 | proxyer_handle_response(m, tid); |
| 714 | break; |
| 715 | case PROXY_OP_RETURN_TO_SENDER: |
| 716 | proxyer_handle_return_to_sender(m, tid); |
| 717 | break; |
| 718 | default: |
| 719 | cf_warning(AS_PROXY, "received unexpected message op %u" , op); |
| 720 | break; |
| 721 | } |
| 722 | |
| 723 | as_fabric_msg_put(m); |
| 724 | return 0; |
| 725 | } |
| 726 | |