| 1 | /* |
| 2 | * proto.c |
| 3 | * |
| 4 | * Copyright (C) 2008-2015 Aerospike, Inc. |
| 5 | * |
| 6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
| 7 | * license agreements. |
| 8 | * |
| 9 | * This program is free software: you can redistribute it and/or modify it under |
| 10 | * the terms of the GNU Affero General Public License as published by the Free |
| 11 | * Software Foundation, either version 3 of the License, or (at your option) any |
| 12 | * later version. |
| 13 | * |
| 14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
| 15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
| 17 | * details. |
| 18 | * |
| 19 | * You should have received a copy of the GNU Affero General Public License |
| 20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
| 21 | */ |
| 22 | |
| 23 | //========================================================== |
| 24 | // Includes. |
| 25 | // |
| 26 | |
| 27 | #include "base/proto.h" |
| 28 | |
| 29 | #include <errno.h> |
| 30 | #include <stdbool.h> |
| 31 | #include <stddef.h> |
| 32 | #include <stdint.h> |
| 33 | #include <string.h> |
| 34 | #include <unistd.h> |
| 35 | |
| 36 | #include "aerospike/as_val.h" |
| 37 | #include "citrusleaf/alloc.h" |
| 38 | #include "citrusleaf/cf_byte_order.h" |
| 39 | #include "citrusleaf/cf_digest.h" |
| 40 | #include "citrusleaf/cf_queue.h" |
| 41 | #include "citrusleaf/cf_vector.h" |
| 42 | |
| 43 | #include "cf_thread.h" |
| 44 | #include "dynbuf.h" |
| 45 | #include "fault.h" |
| 46 | #include "socket.h" |
| 47 | |
| 48 | #include "base/as_stap.h" |
| 49 | #include "base/datamodel.h" |
| 50 | #include "base/index.h" |
| 51 | #include "base/thr_tsvc.h" |
| 52 | #include "base/transaction.h" |
| 53 | #include "storage/storage.h" |
| 54 | |
| 55 | |
| 56 | //========================================================== |
| 57 | // Typedefs & constants. |
| 58 | // |
| 59 | |
| 60 | #define MSG_STACK_BUFFER_SZ (1024 * 16) |
| 61 | #define NETIO_MAX_IO_RETRY 5 |
| 62 | |
| 63 | static const char SUCCESS_BIN_NAME[] = "SUCCESS" ; |
| 64 | static const char FAILURE_BIN_NAME[] = "FAILURE" ; |
| 65 | |
| 66 | |
| 67 | //========================================================== |
| 68 | // Globals. |
| 69 | // |
| 70 | |
| 71 | static cf_queue g_netio_queue; |
| 72 | static cf_queue g_netio_slow_queue; |
| 73 | |
| 74 | |
| 75 | //========================================================== |
| 76 | // Forward declarations. |
| 77 | // |
| 78 | |
| 79 | static int send_reply_buf(as_file_handle *fd_h, uint8_t *msgp, size_t msg_sz); |
| 80 | static void *run_netio(void *q_to_wait_on); |
| 81 | static int netio_send_packet(as_file_handle *fd_h, cf_buf_builder *bb_r, uint32_t *offset, bool blocking); |
| 82 | |
| 83 | |
| 84 | //========================================================== |
| 85 | // Public API - byte swapping. |
| 86 | // |
| 87 | |
| 88 | void |
| 89 | as_proto_swap(as_proto *proto) |
| 90 | { |
| 91 | uint8_t version = proto->version; |
| 92 | uint8_t type = proto->type; |
| 93 | |
| 94 | proto->version = proto->type = 0; |
| 95 | proto->sz = cf_swap_from_be64(*(uint64_t *)proto); |
| 96 | proto->version = version; |
| 97 | proto->type = type; |
| 98 | } |
| 99 | |
| 100 | void |
| 101 | (as_msg *m) |
| 102 | { |
| 103 | m->generation = cf_swap_from_be32(m->generation); |
| 104 | m->record_ttl = cf_swap_from_be32(m->record_ttl); |
| 105 | m->transaction_ttl = cf_swap_from_be32(m->transaction_ttl); |
| 106 | m->n_fields = cf_swap_from_be16(m->n_fields); |
| 107 | m->n_ops = cf_swap_from_be16(m->n_ops); |
| 108 | } |
| 109 | |
| 110 | void |
| 111 | as_msg_swap_field(as_msg_field *mf) |
| 112 | { |
| 113 | mf->field_sz = cf_swap_from_be32(mf->field_sz); |
| 114 | } |
| 115 | |
| 116 | void |
| 117 | as_msg_swap_op(as_msg_op *op) |
| 118 | { |
| 119 | op->op_sz = cf_swap_from_be32(op->op_sz); |
| 120 | } |
| 121 | |
| 122 | |
| 123 | //========================================================== |
| 124 | // Public API - generating internal transactions. |
| 125 | // |
| 126 | |
| 127 | // Allocates cl_msg returned - caller must free it. Everything is host-ordered. |
| 128 | // Will add more parameters (e.g. for set name) only as they become necessary. |
| 129 | cl_msg * |
| 130 | as_msg_create_internal(const char *ns_name, uint8_t info1, uint8_t info2, |
| 131 | uint8_t info3, uint16_t n_ops, uint8_t *ops, size_t ops_sz) |
| 132 | { |
| 133 | size_t ns_name_len = strlen(ns_name); |
| 134 | |
| 135 | size_t msg_sz = sizeof(cl_msg) + |
| 136 | sizeof(as_msg_field) + ns_name_len + |
| 137 | ops_sz; |
| 138 | |
| 139 | cl_msg *msgp = (cl_msg *)cf_malloc(msg_sz); |
| 140 | |
| 141 | msgp->proto.version = PROTO_VERSION; |
| 142 | msgp->proto.type = PROTO_TYPE_AS_MSG; |
| 143 | msgp->proto.sz = msg_sz - sizeof(as_proto); |
| 144 | |
| 145 | as_msg *m = &msgp->msg; |
| 146 | |
| 147 | m->header_sz = sizeof(as_msg); |
| 148 | m->info1 = info1; |
| 149 | m->info2 = info2; |
| 150 | m->info3 = info3; |
| 151 | m->unused = 0; |
| 152 | m->result_code = 0; |
| 153 | m->generation = 0; |
| 154 | m->record_ttl = 0; |
| 155 | m->transaction_ttl = 0; |
| 156 | m->n_fields = 1; |
| 157 | m->n_ops = n_ops; |
| 158 | |
| 159 | as_msg_field *mf = (as_msg_field *)(m->data); |
| 160 | |
| 161 | mf->type = AS_MSG_FIELD_TYPE_NAMESPACE; |
| 162 | mf->field_sz = (uint32_t)ns_name_len + 1; |
| 163 | memcpy(mf->data, ns_name, ns_name_len); |
| 164 | |
| 165 | if (ops != NULL) { |
| 166 | uint8_t *msg_ops = (uint8_t *)as_msg_field_get_next(mf); |
| 167 | |
| 168 | memcpy(msg_ops, ops, ops_sz); |
| 169 | } |
| 170 | |
| 171 | return msgp; |
| 172 | } |
| 173 | |
| 174 | |
| 175 | //========================================================== |
| 176 | // Public API - packing responses. |
| 177 | // |
| 178 | |
| 179 | // Allocates cl_msg returned - caller must free it. |
| 180 | cl_msg * |
| 181 | as_msg_make_response_msg(uint32_t result_code, uint32_t generation, |
| 182 | uint32_t void_time, as_msg_op **ops, as_bin **bins, uint16_t bin_count, |
| 183 | as_namespace *ns, cl_msg *msgp_in, size_t *msg_sz_in, uint64_t trid) |
| 184 | { |
| 185 | uint16_t n_fields = 0; |
| 186 | size_t msg_sz = sizeof(cl_msg); |
| 187 | |
| 188 | if (trid != 0) { |
| 189 | n_fields++; |
| 190 | msg_sz += sizeof(as_msg_field) + sizeof(trid); |
| 191 | } |
| 192 | |
| 193 | msg_sz += sizeof(as_msg_op) * bin_count; |
| 194 | |
| 195 | for (uint16_t i = 0; i < bin_count; i++) { |
| 196 | if (ops) { |
| 197 | msg_sz += ops[i]->name_sz; |
| 198 | } |
| 199 | else if (bins[i]) { |
| 200 | msg_sz += ns->single_bin ? |
| 201 | 0 : strlen(as_bin_get_name_from_id(ns, bins[i]->id)); |
| 202 | } |
| 203 | else { |
| 204 | cf_crash(AS_PROTO, "making response message with null bin and op" ); |
| 205 | } |
| 206 | |
| 207 | if (bins[i]) { |
| 208 | msg_sz += as_bin_particle_client_value_size(bins[i]); |
| 209 | } |
| 210 | } |
| 211 | |
| 212 | uint8_t *buf; |
| 213 | |
| 214 | if (! msgp_in || *msg_sz_in < msg_sz) { |
| 215 | buf = cf_malloc(msg_sz); |
| 216 | } |
| 217 | else { |
| 218 | buf = (uint8_t *)msgp_in; |
| 219 | } |
| 220 | |
| 221 | *msg_sz_in = msg_sz; |
| 222 | |
| 223 | cl_msg *msgp = (cl_msg *)buf; |
| 224 | |
| 225 | msgp->proto.version = PROTO_VERSION; |
| 226 | msgp->proto.type = PROTO_TYPE_AS_MSG; |
| 227 | msgp->proto.sz = msg_sz - sizeof(as_proto); |
| 228 | |
| 229 | as_proto_swap(&msgp->proto); |
| 230 | |
| 231 | as_msg *m = &msgp->msg; |
| 232 | |
| 233 | m->header_sz = sizeof(as_msg); |
| 234 | m->info1 = 0; |
| 235 | m->info2 = 0; |
| 236 | m->info3 = 0; |
| 237 | m->unused = 0; |
| 238 | m->result_code = result_code; |
| 239 | m->generation = generation == 0 ? 0 : plain_generation(generation, ns); |
| 240 | m->record_ttl = void_time; |
| 241 | m->transaction_ttl = 0; |
| 242 | m->n_fields = n_fields; |
| 243 | m->n_ops = bin_count; |
| 244 | |
| 245 | as_msg_swap_header(m); |
| 246 | |
| 247 | buf = m->data; |
| 248 | |
| 249 | if (trid != 0) { |
| 250 | as_msg_field *mf = (as_msg_field *)buf; |
| 251 | |
| 252 | mf->field_sz = 1 + sizeof(uint64_t); |
| 253 | mf->type = AS_MSG_FIELD_TYPE_TRID; |
| 254 | *(uint64_t *)mf->data = cf_swap_to_be64(trid); |
| 255 | as_msg_swap_field(mf); |
| 256 | buf += sizeof(as_msg_field) + sizeof(uint64_t); |
| 257 | } |
| 258 | |
| 259 | for (uint16_t i = 0; i < bin_count; i++) { |
| 260 | as_msg_op *op = (as_msg_op *)buf; |
| 261 | |
| 262 | op->version = 0; |
| 263 | |
| 264 | if (ops) { |
| 265 | op->op = ops[i]->op; |
| 266 | memcpy(op->name, ops[i]->name, ops[i]->name_sz); |
| 267 | op->name_sz = ops[i]->name_sz; |
| 268 | } |
| 269 | else { |
| 270 | op->op = AS_MSG_OP_READ; |
| 271 | op->name_sz = as_bin_memcpy_name(ns, op->name, bins[i]); |
| 272 | } |
| 273 | |
| 274 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
| 275 | |
| 276 | buf += sizeof(as_msg_op) + op->name_sz; |
| 277 | buf += as_bin_particle_to_client(bins[i], op); |
| 278 | |
| 279 | as_msg_swap_op(op); |
| 280 | } |
| 281 | |
| 282 | return msgp; |
| 283 | } |
| 284 | |
| 285 | // Pass NULL bb_r for sizing only. Return value is size if >= 0, error if < 0. |
| 286 | int32_t |
| 287 | as_msg_make_response_bufbuilder(cf_buf_builder **bb_r, as_storage_rd *rd, |
| 288 | bool no_bin_data, cf_vector *select_bins) |
| 289 | { |
| 290 | as_namespace *ns = rd->ns; |
| 291 | as_record *r = rd->r; |
| 292 | |
| 293 | size_t ns_len = strlen(ns->name); |
| 294 | const char *set_name = as_index_get_set_name(r, ns); |
| 295 | size_t set_name_len = set_name ? strlen(set_name) : 0; |
| 296 | |
| 297 | const uint8_t* key = NULL; |
| 298 | uint32_t key_size = 0; |
| 299 | |
| 300 | if (r->key_stored == 1) { |
| 301 | if (! as_storage_record_get_key(rd)) { |
| 302 | cf_warning(AS_PROTO, "can't get key - skipping record" ); |
| 303 | return -1; |
| 304 | } |
| 305 | |
| 306 | key = rd->key; |
| 307 | key_size = rd->key_size; |
| 308 | } |
| 309 | |
| 310 | uint16_t n_fields = 2; // always add namespace and digest |
| 311 | size_t msg_sz = sizeof(as_msg) + |
| 312 | sizeof(as_msg_field) + ns_len + |
| 313 | sizeof(as_msg_field) + sizeof(cf_digest); |
| 314 | |
| 315 | if (set_name) { |
| 316 | n_fields++; |
| 317 | msg_sz += sizeof(as_msg_field) + set_name_len; |
| 318 | } |
| 319 | |
| 320 | if (key) { |
| 321 | n_fields++; |
| 322 | msg_sz += sizeof(as_msg_field) + key_size; |
| 323 | } |
| 324 | |
| 325 | uint32_t n_select_bins = 0; |
| 326 | uint16_t n_bins_matched = 0; |
| 327 | uint16_t n_record_bins = 0; |
| 328 | |
| 329 | if (! no_bin_data) { |
| 330 | if (select_bins) { |
| 331 | n_select_bins = cf_vector_size(select_bins); |
| 332 | |
| 333 | for (uint32_t i = 0; i < n_select_bins; i++) { |
| 334 | char bin_name[AS_BIN_NAME_MAX_SZ]; |
| 335 | |
| 336 | cf_vector_get(select_bins, i, (void*)&bin_name); |
| 337 | |
| 338 | as_bin *b = as_bin_get(rd, bin_name); |
| 339 | |
| 340 | if (! b) { |
| 341 | continue; |
| 342 | } |
| 343 | |
| 344 | msg_sz += sizeof(as_msg_op); |
| 345 | msg_sz += ns->single_bin ? 0 : strlen(bin_name); |
| 346 | msg_sz += as_bin_particle_client_value_size(b); |
| 347 | |
| 348 | n_bins_matched++; |
| 349 | } |
| 350 | |
| 351 | // Don't return an empty record. |
| 352 | if (n_bins_matched == 0) { |
| 353 | return 0; |
| 354 | } |
| 355 | } |
| 356 | else { |
| 357 | n_record_bins = as_bin_inuse_count(rd); |
| 358 | |
| 359 | msg_sz += sizeof(as_msg_op) * n_record_bins; |
| 360 | |
| 361 | for (uint16_t i = 0; i < n_record_bins; i++) { |
| 362 | as_bin *b = &rd->bins[i]; |
| 363 | |
| 364 | msg_sz += ns->single_bin ? |
| 365 | 0 : strlen(as_bin_get_name_from_id(ns, b->id)); |
| 366 | msg_sz += (int)as_bin_particle_client_value_size(b); |
| 367 | } |
| 368 | } |
| 369 | } |
| 370 | |
| 371 | // NULL buf-builder means just return size. |
| 372 | if (! bb_r) { |
| 373 | return (int32_t)msg_sz; |
| 374 | } |
| 375 | |
| 376 | uint8_t *buf; |
| 377 | |
| 378 | cf_buf_builder_reserve(bb_r, (int)msg_sz, &buf); |
| 379 | |
| 380 | as_msg *m = (as_msg *)buf; |
| 381 | |
| 382 | m->header_sz = sizeof(as_msg); |
| 383 | m->info1 = no_bin_data ? AS_MSG_INFO1_GET_NO_BINS : 0; |
| 384 | m->info2 = 0; |
| 385 | m->info3 = 0; |
| 386 | m->unused = 0; |
| 387 | m->result_code = AS_OK; |
| 388 | m->generation = plain_generation(r->generation, ns); |
| 389 | m->record_ttl = r->void_time; |
| 390 | m->transaction_ttl = 0; |
| 391 | m->n_fields = n_fields; |
| 392 | |
| 393 | if (no_bin_data) { |
| 394 | m->n_ops = 0; |
| 395 | } |
| 396 | else { |
| 397 | m->n_ops = select_bins ? n_bins_matched : n_record_bins; |
| 398 | } |
| 399 | |
| 400 | as_msg_swap_header(m); |
| 401 | |
| 402 | buf = m->data; |
| 403 | |
| 404 | as_msg_field *mf = (as_msg_field *)buf; |
| 405 | |
| 406 | mf->field_sz = ns_len + 1; |
| 407 | mf->type = AS_MSG_FIELD_TYPE_NAMESPACE; |
| 408 | memcpy(mf->data, ns->name, ns_len); |
| 409 | as_msg_swap_field(mf); |
| 410 | buf += sizeof(as_msg_field) + ns_len; |
| 411 | |
| 412 | mf = (as_msg_field *)buf; |
| 413 | mf->field_sz = sizeof(cf_digest) + 1; |
| 414 | mf->type = AS_MSG_FIELD_TYPE_DIGEST_RIPE; |
| 415 | memcpy(mf->data, &r->keyd, sizeof(cf_digest)); |
| 416 | as_msg_swap_field(mf); |
| 417 | buf += sizeof(as_msg_field) + sizeof(cf_digest); |
| 418 | |
| 419 | if (set_name) { |
| 420 | mf = (as_msg_field *)buf; |
| 421 | mf->field_sz = set_name_len + 1; |
| 422 | mf->type = AS_MSG_FIELD_TYPE_SET; |
| 423 | memcpy(mf->data, set_name, set_name_len); |
| 424 | as_msg_swap_field(mf); |
| 425 | buf += sizeof(as_msg_field) + set_name_len; |
| 426 | } |
| 427 | |
| 428 | if (key) { |
| 429 | mf = (as_msg_field *)buf; |
| 430 | mf->field_sz = key_size + 1; |
| 431 | mf->type = AS_MSG_FIELD_TYPE_KEY; |
| 432 | memcpy(mf->data, key, key_size); |
| 433 | as_msg_swap_field(mf); |
| 434 | buf += sizeof(as_msg_field) + key_size; |
| 435 | } |
| 436 | |
| 437 | if (no_bin_data) { |
| 438 | return (int32_t)msg_sz; |
| 439 | } |
| 440 | |
| 441 | if (select_bins) { |
| 442 | for (uint32_t i = 0; i < n_select_bins; i++) { |
| 443 | char bin_name[AS_BIN_NAME_MAX_SZ]; |
| 444 | |
| 445 | cf_vector_get(select_bins, i, (void*)&bin_name); |
| 446 | |
| 447 | as_bin *b = as_bin_get(rd, bin_name); |
| 448 | |
| 449 | if (! b) { |
| 450 | continue; |
| 451 | } |
| 452 | |
| 453 | as_msg_op *op = (as_msg_op *)buf; |
| 454 | |
| 455 | op->op = AS_MSG_OP_READ; |
| 456 | op->version = 0; |
| 457 | op->name_sz = as_bin_memcpy_name(ns, op->name, b); |
| 458 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
| 459 | |
| 460 | buf += sizeof(as_msg_op) + op->name_sz; |
| 461 | buf += as_bin_particle_to_client(b, op); |
| 462 | |
| 463 | as_msg_swap_op(op); |
| 464 | } |
| 465 | } |
| 466 | else { |
| 467 | for (uint16_t i = 0; i < n_record_bins; i++) { |
| 468 | as_msg_op *op = (as_msg_op *)buf; |
| 469 | |
| 470 | op->op = AS_MSG_OP_READ; |
| 471 | op->version = 0; |
| 472 | op->name_sz = as_bin_memcpy_name(ns, op->name, &rd->bins[i]); |
| 473 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
| 474 | |
| 475 | buf += sizeof(as_msg_op) + op->name_sz; |
| 476 | buf += as_bin_particle_to_client(&rd->bins[i], op); |
| 477 | |
| 478 | as_msg_swap_op(op); |
| 479 | } |
| 480 | } |
| 481 | |
| 482 | return (int32_t)msg_sz; |
| 483 | } |
| 484 | |
| 485 | cl_msg * |
| 486 | as_msg_make_val_response(bool success, const as_val *val, uint32_t result_code, |
| 487 | uint32_t generation, uint32_t void_time, uint64_t trid, |
| 488 | size_t *p_msg_sz) |
| 489 | { |
| 490 | const char *bin_name; |
| 491 | size_t bin_name_len; |
| 492 | |
| 493 | if (success) { |
| 494 | bin_name = SUCCESS_BIN_NAME; |
| 495 | bin_name_len = sizeof(SUCCESS_BIN_NAME) - 1; |
| 496 | } |
| 497 | else { |
| 498 | bin_name = FAILURE_BIN_NAME; |
| 499 | bin_name_len = sizeof(FAILURE_BIN_NAME) - 1; |
| 500 | } |
| 501 | |
| 502 | uint16_t n_fields = 0; |
| 503 | size_t msg_sz = sizeof(cl_msg); |
| 504 | |
| 505 | if (trid != 0) { |
| 506 | n_fields++; |
| 507 | msg_sz += sizeof(as_msg_field) + sizeof(trid); |
| 508 | } |
| 509 | |
| 510 | msg_sz += sizeof(as_msg_op) + bin_name_len + |
| 511 | as_particle_asval_client_value_size(val); |
| 512 | |
| 513 | uint8_t *buf = cf_malloc(msg_sz); |
| 514 | cl_msg *msgp = (cl_msg *)buf; |
| 515 | |
| 516 | msgp->proto.version = PROTO_VERSION; |
| 517 | msgp->proto.type = PROTO_TYPE_AS_MSG; |
| 518 | msgp->proto.sz = msg_sz - sizeof(as_proto); |
| 519 | |
| 520 | as_proto_swap(&msgp->proto); |
| 521 | |
| 522 | as_msg *m = &msgp->msg; |
| 523 | |
| 524 | m->header_sz = sizeof(as_msg); |
| 525 | m->info1 = 0; |
| 526 | m->info2 = 0; |
| 527 | m->info3 = 0; |
| 528 | m->unused = 0; |
| 529 | m->result_code = result_code; |
| 530 | m->generation = generation; |
| 531 | m->record_ttl = void_time; |
| 532 | m->transaction_ttl = 0; |
| 533 | m->n_fields = n_fields; |
| 534 | m->n_ops = 1; // only the one special bin |
| 535 | |
| 536 | as_msg_swap_header(m); |
| 537 | |
| 538 | buf = m->data; |
| 539 | |
| 540 | if (trid != 0) { |
| 541 | as_msg_field *mf = (as_msg_field *)buf; |
| 542 | |
| 543 | mf->field_sz = 1 + sizeof(uint64_t); |
| 544 | mf->type = AS_MSG_FIELD_TYPE_TRID; |
| 545 | *(uint64_t *)mf->data = cf_swap_to_be64(trid); |
| 546 | as_msg_swap_field(mf); |
| 547 | buf += sizeof(as_msg_field) + sizeof(uint64_t); |
| 548 | } |
| 549 | |
| 550 | as_msg_op *op = (as_msg_op *)buf; |
| 551 | |
| 552 | op->op = AS_MSG_OP_READ; |
| 553 | op->name_sz = (uint8_t)bin_name_len; |
| 554 | memcpy(op->name, bin_name, op->name_sz); |
| 555 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
| 556 | op->version = 0; |
| 557 | |
| 558 | as_particle_asval_to_client(val, op); |
| 559 | |
| 560 | as_msg_swap_op(op); |
| 561 | |
| 562 | *p_msg_sz = msg_sz; |
| 563 | |
| 564 | return msgp; |
| 565 | } |
| 566 | |
| 567 | // Caller-provided val_sz must be the result of calling |
| 568 | // as_particle_asval_client_value_size() for same val. |
| 569 | void |
| 570 | as_msg_make_val_response_bufbuilder(const as_val *val, cf_buf_builder **bb_r, |
| 571 | uint32_t val_sz, bool success) |
| 572 | { |
| 573 | const char *bin_name; |
| 574 | size_t bin_name_len; |
| 575 | |
| 576 | if (success) { |
| 577 | bin_name = SUCCESS_BIN_NAME; |
| 578 | bin_name_len = sizeof(SUCCESS_BIN_NAME) - 1; |
| 579 | } |
| 580 | else { |
| 581 | bin_name = FAILURE_BIN_NAME; |
| 582 | bin_name_len = sizeof(FAILURE_BIN_NAME) - 1; |
| 583 | } |
| 584 | |
| 585 | size_t msg_sz = sizeof(as_msg) + sizeof(as_msg_op) + bin_name_len + val_sz; |
| 586 | |
| 587 | uint8_t *buf; |
| 588 | |
| 589 | cf_buf_builder_reserve(bb_r, (int)msg_sz, &buf); |
| 590 | |
| 591 | as_msg *m = (as_msg *)buf; |
| 592 | |
| 593 | m->header_sz = sizeof(as_msg); |
| 594 | m->info1 = 0; |
| 595 | m->info2 = 0; |
| 596 | m->info3 = 0; |
| 597 | m->unused = 0; |
| 598 | m->result_code = AS_OK; |
| 599 | m->generation = 0; |
| 600 | m->record_ttl = 0; |
| 601 | m->transaction_ttl = 0; |
| 602 | m->n_fields = 0; |
| 603 | m->n_ops = 1; // only the one special bin |
| 604 | |
| 605 | as_msg_swap_header(m); |
| 606 | |
| 607 | as_msg_op *op = (as_msg_op *)m->data; |
| 608 | |
| 609 | op->op = AS_MSG_OP_READ; |
| 610 | op->name_sz = (uint8_t)bin_name_len; |
| 611 | memcpy(op->name, bin_name, op->name_sz); |
| 612 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
| 613 | op->version = 0; |
| 614 | |
| 615 | as_particle_asval_to_client(val, op); |
| 616 | |
| 617 | as_msg_swap_op(op); |
| 618 | } |
| 619 | |
| 620 | |
| 621 | //========================================================== |
| 622 | // Public API - sending responses to client. |
| 623 | // |
| 624 | |
| 625 | // Make an individual transaction response and send it. |
| 626 | int |
| 627 | as_msg_send_reply(as_file_handle *fd_h, uint32_t result_code, |
| 628 | uint32_t generation, uint32_t void_time, as_msg_op **ops, as_bin **bins, |
| 629 | uint16_t bin_count, as_namespace *ns, uint64_t trid) |
| 630 | { |
| 631 | uint8_t stack_buf[MSG_STACK_BUFFER_SZ]; |
| 632 | size_t msg_sz = sizeof(stack_buf); |
| 633 | uint8_t *msgp = (uint8_t *)as_msg_make_response_msg(result_code, generation, |
| 634 | void_time, ops, bins, bin_count, ns, (cl_msg *)stack_buf, &msg_sz, |
| 635 | trid); |
| 636 | |
| 637 | int rv = send_reply_buf(fd_h, msgp, msg_sz); |
| 638 | |
| 639 | if (msgp != stack_buf) { |
| 640 | cf_free(msgp); |
| 641 | } |
| 642 | |
| 643 | return rv; |
| 644 | } |
| 645 | |
| 646 | // Send a pre-made response saved in a dyn-buf. |
| 647 | int |
| 648 | as_msg_send_ops_reply(as_file_handle *fd_h, cf_dyn_buf *db) |
| 649 | { |
| 650 | return send_reply_buf(fd_h, db->buf, db->used_sz); |
| 651 | } |
| 652 | |
| 653 | // Send a blocking "fin" message with default timeout. |
| 654 | bool |
| 655 | as_msg_send_fin(cf_socket *sock, uint32_t result_code) |
| 656 | { |
| 657 | return as_msg_send_fin_timeout(sock, result_code, CF_SOCKET_TIMEOUT) != 0; |
| 658 | } |
| 659 | |
| 660 | // Send a blocking "fin" message with a specified timeout. |
| 661 | size_t |
| 662 | as_msg_send_fin_timeout(cf_socket *sock, uint32_t result_code, int32_t timeout) |
| 663 | { |
| 664 | cl_msg msgp; |
| 665 | |
| 666 | msgp.proto.version = PROTO_VERSION; |
| 667 | msgp.proto.type = PROTO_TYPE_AS_MSG; |
| 668 | msgp.proto.sz = sizeof(as_msg); |
| 669 | |
| 670 | as_proto_swap(&msgp.proto); |
| 671 | |
| 672 | as_msg *m = &msgp.msg; |
| 673 | |
| 674 | m->header_sz = sizeof(as_msg); |
| 675 | m->info1 = 0; |
| 676 | m->info2 = 0; |
| 677 | m->info3 = AS_MSG_INFO3_LAST; |
| 678 | m->unused = 0; |
| 679 | m->result_code = result_code; |
| 680 | m->generation = 0; |
| 681 | m->record_ttl = 0; |
| 682 | m->transaction_ttl = 0; |
| 683 | m->n_fields = 0; |
| 684 | m->n_ops = 0; |
| 685 | |
| 686 | as_msg_swap_header(m); |
| 687 | |
| 688 | if (cf_socket_send_all(sock, (uint8_t*)&msgp, sizeof(msgp), MSG_NOSIGNAL, |
| 689 | timeout) < 0) { |
| 690 | cf_warning(AS_PROTO, "send error - fd %d %s" , CSFD(sock), |
| 691 | cf_strerror(errno)); |
| 692 | return 0; |
| 693 | } |
| 694 | |
| 695 | return sizeof(cl_msg); |
| 696 | } |
| 697 | |
| 698 | |
| 699 | //========================================================== |
| 700 | // Public API - query "net-IO" responses. |
| 701 | // |
| 702 | |
| 703 | void |
| 704 | as_netio_init() |
| 705 | { |
| 706 | cf_queue_init(&g_netio_queue, sizeof(as_netio), 64, true); |
| 707 | cf_queue_init(&g_netio_slow_queue, sizeof(as_netio), 64, true); |
| 708 | |
| 709 | cf_thread_create_detached(run_netio, (void *)&g_netio_queue); |
| 710 | cf_thread_create_detached(run_netio, (void *)&g_netio_slow_queue); |
| 711 | } |
| 712 | |
| 713 | // Based on io object, send buffer to the network, or queue for retry. |
| 714 | // |
| 715 | // start_cb: Callback to the module before the real IO is started. Returns: |
| 716 | // AS_NETIO_OK: Everything ok, go ahead with IO. |
| 717 | // AS_NETIO_ERR: If there was issue like abort/err/timeout etc. |
| 718 | // |
| 719 | // finish_cb: Callback to module with status code of the IO call. Returns: |
| 720 | // AS_NETIO_OK: Everything ok. |
| 721 | // AS_NETIO_CONTINUE: The IO was requeued. |
| 722 | // AS_NETIO_ERR: IO erred out due to some issue. |
| 723 | // |
| 724 | // finish_cb should do the needful like release ref to user data etc. |
| 725 | // |
| 726 | // Returns: |
| 727 | // AS_NETIO_OK: Everything is fine, both start_cb & finish_cb were called. |
| 728 | // AS_NETIO_ERR: Something failed either calling start_cb or while doing |
| 729 | // network IO, finish_cb is called. |
| 730 | // |
| 731 | // This function consumes qtr reference. It calls finish_cb which releases ref |
| 732 | // to qtr. In case of AS_NETIO_CONTINUE: this function also consumes bb_r and |
| 733 | // ref for fd_h. The background thread is responsible for freeing up bb_r and |
| 734 | // releasing ref to fd_h. |
| 735 | int |
| 736 | as_netio_send(as_netio *io, bool slow, bool blocking) |
| 737 | { |
| 738 | int ret = io->start_cb(io, io->seq); |
| 739 | |
| 740 | if (ret == AS_NETIO_OK) { |
| 741 | ret = io->finish_cb(io, netio_send_packet(io->fd_h, io->bb_r, |
| 742 | &io->offset, blocking)); |
| 743 | } |
| 744 | else { |
| 745 | ret = io->finish_cb(io, ret); |
| 746 | } |
| 747 | |
| 748 | // If needs requeue then requeue it. |
| 749 | switch (ret) { |
| 750 | case AS_NETIO_CONTINUE: |
| 751 | if (slow) { |
| 752 | io->slow = true; |
| 753 | cf_queue_push(&g_netio_slow_queue, io); |
| 754 | } |
| 755 | else { |
| 756 | cf_queue_push(&g_netio_queue, io); |
| 757 | } |
| 758 | break; |
| 759 | default: |
| 760 | ret = AS_NETIO_OK; |
| 761 | break; |
| 762 | } |
| 763 | |
| 764 | return ret; |
| 765 | } |
| 766 | |
| 767 | |
| 768 | //========================================================== |
| 769 | // Local helpers. |
| 770 | // |
| 771 | |
| 772 | static int |
| 773 | send_reply_buf(as_file_handle *fd_h, uint8_t *msgp, size_t msg_sz) |
| 774 | { |
| 775 | cf_assert(cf_socket_exists(&fd_h->sock), AS_PROTO, "fd is invalid" ); |
| 776 | |
| 777 | if (cf_socket_send_all(&fd_h->sock, msgp, msg_sz, MSG_NOSIGNAL, |
| 778 | CF_SOCKET_TIMEOUT) < 0) { |
| 779 | // Common when a client aborts. |
| 780 | cf_debug(AS_PROTO, "protocol write fail: fd %d sz %zu errno %d" , |
| 781 | CSFD(&fd_h->sock), msg_sz, errno); |
| 782 | |
| 783 | as_end_of_transaction_force_close(fd_h); |
| 784 | return -1; |
| 785 | } |
| 786 | |
| 787 | as_end_of_transaction_ok(fd_h); |
| 788 | return 0; |
| 789 | } |
| 790 | |
| 791 | static void * |
| 792 | run_netio(void *q_to_wait_on) |
| 793 | { |
| 794 | cf_queue *q = (cf_queue*)q_to_wait_on; |
| 795 | |
| 796 | while (true) { |
| 797 | as_netio io; |
| 798 | |
| 799 | if (cf_queue_pop(q, &io, CF_QUEUE_FOREVER) != 0) { |
| 800 | cf_crash(AS_PROTO, "failed to pop from IO worker queue." ); |
| 801 | } |
| 802 | |
| 803 | if (io.slow) { |
| 804 | usleep(g_config.proto_slow_netio_sleep_ms * 1000); |
| 805 | } |
| 806 | |
| 807 | as_netio_send(&io, true, false); |
| 808 | } |
| 809 | |
| 810 | return NULL; |
| 811 | } |
| 812 | |
| 813 | static int |
| 814 | netio_send_packet(as_file_handle *fd_h, cf_buf_builder *bb_r, uint32_t *offset, |
| 815 | bool blocking) |
| 816 | { |
| 817 | #if defined(USE_SYSTEMTAP) |
| 818 | uint64_t nodeid = g_config.self_node; |
| 819 | #endif |
| 820 | |
| 821 | uint32_t len = bb_r->used_sz; |
| 822 | uint8_t *buf = bb_r->buf; |
| 823 | |
| 824 | as_proto proto; |
| 825 | |
| 826 | proto.version = PROTO_VERSION; |
| 827 | proto.type = PROTO_TYPE_AS_MSG; |
| 828 | proto.sz = len - 8; |
| 829 | as_proto_swap(&proto); |
| 830 | |
| 831 | memcpy(bb_r->buf, &proto, 8); |
| 832 | |
| 833 | uint32_t pos = *offset; |
| 834 | |
| 835 | ASD_QUERY_SENDPACKET_STARTING(nodeid, pos, len); |
| 836 | |
| 837 | int retry = 0; |
| 838 | |
| 839 | cf_detail(AS_PROTO," start at %p %d %d" , buf, pos, len); |
| 840 | |
| 841 | while (pos < len) { |
| 842 | int rv = cf_socket_send(&fd_h->sock, buf + pos, len - pos, |
| 843 | MSG_NOSIGNAL); |
| 844 | |
| 845 | if (rv <= 0) { |
| 846 | if (errno != EAGAIN) { |
| 847 | cf_debug(AS_PROTO, "packet send response error returned %d errno %d fd %d" , |
| 848 | rv, errno, CSFD(&fd_h->sock)); |
| 849 | return AS_NETIO_IO_ERR; |
| 850 | } |
| 851 | |
| 852 | if (! blocking && (retry > NETIO_MAX_IO_RETRY)) { |
| 853 | *offset = pos; |
| 854 | cf_detail(AS_PROTO," end at %p %d %d" , buf, pos, len); |
| 855 | ASD_QUERY_SENDPACKET_CONTINUE(nodeid, pos); |
| 856 | return AS_NETIO_CONTINUE; |
| 857 | } |
| 858 | |
| 859 | retry++; |
| 860 | // bigger packets so try few extra times |
| 861 | usleep(100); |
| 862 | } |
| 863 | else { |
| 864 | pos += rv; |
| 865 | } |
| 866 | } |
| 867 | |
| 868 | ASD_QUERY_SENDPACKET_FINISHED(nodeid); |
| 869 | return AS_NETIO_OK; |
| 870 | } |
| 871 | |