| 1 | /* |
| 2 | * batch.c |
| 3 | * |
| 4 | * Copyright (C) 2012-2015 Aerospike, Inc. |
| 5 | * |
| 6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
| 7 | * license agreements. |
| 8 | * |
| 9 | * This program is free software: you can redistribute it and/or modify it under |
| 10 | * the terms of the GNU Affero General Public License as published by the Free |
| 11 | * Software Foundation, either version 3 of the License, or (at your option) any |
| 12 | * later version. |
| 13 | * |
| 14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
| 15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
| 17 | * details. |
| 18 | * |
| 19 | * You should have received a copy of the GNU Affero General Public License |
| 20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
| 21 | */ |
| 22 | #include "base/batch.h" |
| 23 | #include "aerospike/as_buffer_pool.h" |
| 24 | #include "aerospike/as_thread_pool.h" |
| 25 | #include "citrusleaf/alloc.h" |
| 26 | #include "citrusleaf/cf_atomic.h" |
| 27 | #include "citrusleaf/cf_byte_order.h" |
| 28 | #include "citrusleaf/cf_clock.h" |
| 29 | #include "citrusleaf/cf_digest.h" |
| 30 | #include "citrusleaf/cf_queue.h" |
| 31 | #include "base/cfg.h" |
| 32 | #include "base/datamodel.h" |
| 33 | #include "base/index.h" |
| 34 | #include "base/predexp.h" |
| 35 | #include "base/proto.h" |
| 36 | #include "base/security.h" |
| 37 | #include "base/service.h" |
| 38 | #include "base/stats.h" |
| 39 | #include "base/thr_tsvc.h" |
| 40 | #include "base/transaction.h" |
| 41 | #include "cf_mutex.h" |
| 42 | #include "hardware.h" |
| 43 | #include "socket.h" |
| 44 | #include <errno.h> |
| 45 | #include <pthread.h> |
| 46 | #include <unistd.h> |
| 47 | |
| 48 | //--------------------------------------------------------- |
| 49 | // MACROS |
| 50 | //--------------------------------------------------------- |
| 51 | |
| 52 | #define BATCH_BLOCK_SIZE (1024 * 128) // 128K |
| 53 | #define BATCH_REPEAT_SIZE 25 // index(4),digest(20) and repeat(1) |
| 54 | |
| 55 | #define BATCH_ABANDON_LIMIT (30UL * 1000 * 1000 * 1000) // 30 seconds |
| 56 | |
| 57 | #define BATCH_SUCCESS 0 |
| 58 | #define BATCH_ERROR -1 |
| 59 | #define BATCH_DELAY 1 |
| 60 | |
| 61 | //--------------------------------------------------------- |
| 62 | // TYPES |
| 63 | //--------------------------------------------------------- |
| 64 | |
| 65 | // Pad batch input header to 30 bytes which is also the size of a transaction header. |
| 66 | // This allows the input memory to be used as transaction cl_msg memory. |
| 67 | // This saves a large number of memory allocations while allowing different |
| 68 | // namespaces/bin name filters to be in the same batch. |
| 69 | typedef struct { |
| 70 | uint32_t index; |
| 71 | cf_digest keyd; |
| 72 | uint8_t repeat; |
| 73 | uint8_t info1; |
| 74 | uint16_t n_fields; |
| 75 | uint16_t n_ops; |
| 76 | } __attribute__((__packed__)) as_batch_input; |
| 77 | |
| 78 | typedef struct { |
| 79 | uint32_t capacity; |
| 80 | uint32_t size; |
| 81 | uint32_t tran_count; |
| 82 | cf_atomic32 writers; |
| 83 | as_proto proto; |
| 84 | uint8_t data[]; |
| 85 | } __attribute__((__packed__)) as_batch_buffer; |
| 86 | |
| 87 | struct as_batch_shared_s { |
| 88 | cf_mutex lock; |
| 89 | cf_queue* response_queue; |
| 90 | as_file_handle* fd_h; |
| 91 | cl_msg* msgp; |
| 92 | as_batch_buffer* buffer; |
| 93 | uint64_t start; |
| 94 | uint64_t end; |
| 95 | uint32_t tran_count_response; |
| 96 | uint32_t tran_count; |
| 97 | uint32_t tran_max; |
| 98 | uint32_t buffer_offset; |
| 99 | as_batch_buffer* delayed_buffer; |
| 100 | int result_code; |
| 101 | bool in_trailer; |
| 102 | bool bad_response_fd; |
| 103 | as_msg_field* predexp_mf; |
| 104 | predexp_eval_t* predexp; |
| 105 | }; |
| 106 | |
| 107 | typedef struct { |
| 108 | as_batch_shared* shared; |
| 109 | as_batch_buffer* buffer; |
| 110 | } as_batch_response; |
| 111 | |
| 112 | typedef struct { |
| 113 | cf_queue* response_queue; |
| 114 | cf_queue* complete_queue; |
| 115 | cf_atomic32 tran_count; |
| 116 | uint32_t delay_count; |
| 117 | volatile bool active; |
| 118 | } as_batch_queue; |
| 119 | |
| 120 | typedef struct { |
| 121 | as_batch_queue* batch_queue; |
| 122 | bool complete; |
| 123 | } as_batch_work; |
| 124 | |
| 125 | //--------------------------------------------------------- |
| 126 | // STATIC DATA |
| 127 | //--------------------------------------------------------- |
| 128 | |
| 129 | static as_thread_pool batch_thread_pool; |
| 130 | static as_buffer_pool batch_buffer_pool; |
| 131 | |
| 132 | static as_batch_queue batch_queues[MAX_BATCH_THREADS]; |
| 133 | static cf_mutex batch_resize_lock; |
| 134 | |
| 135 | //--------------------------------------------------------- |
| 136 | // STATIC FUNCTIONS |
| 137 | //--------------------------------------------------------- |
| 138 | |
| 139 | static int |
| 140 | as_batch_send_error(as_transaction* btr, int result_code) |
| 141 | { |
| 142 | // Send error back to client for batch transaction that failed before |
| 143 | // placing any sub-transactions on transaction queue. |
| 144 | cl_msg m; |
| 145 | m.proto.version = PROTO_VERSION; |
| 146 | m.proto.type = PROTO_TYPE_AS_MSG; |
| 147 | m.proto.sz = sizeof(as_msg); |
| 148 | as_proto_swap(&m.proto); |
| 149 | m.msg.header_sz = sizeof(as_msg); |
| 150 | m.msg.info1 = 0; |
| 151 | m.msg.info2 = 0; |
| 152 | m.msg.info3 = AS_MSG_INFO3_LAST; |
| 153 | m.msg.unused = 0; |
| 154 | m.msg.result_code = result_code; |
| 155 | m.msg.generation = 0; |
| 156 | m.msg.record_ttl = 0; |
| 157 | m.msg.transaction_ttl = 0; |
| 158 | m.msg.n_fields = 0; |
| 159 | m.msg.n_ops = 0; |
| 160 | as_msg_swap_header(&m.msg); |
| 161 | |
| 162 | cf_socket* sock = &btr->from.proto_fd_h->sock; |
| 163 | int status = 0; |
| 164 | |
| 165 | // Use blocking send because error occured before batch transaction was |
| 166 | // placed on batch queue. |
| 167 | if (cf_socket_send_all(sock, (uint8_t*)&m, sizeof(m), MSG_NOSIGNAL, CF_SOCKET_TIMEOUT) < 0) { |
| 168 | // Common when a client aborts. |
| 169 | cf_debug(AS_BATCH, "Batch send response error, errno %d fd %d" , errno, CSFD(sock)); |
| 170 | status = -1; |
| 171 | } |
| 172 | |
| 173 | as_end_of_transaction(btr->from.proto_fd_h, status != 0); |
| 174 | btr->from.proto_fd_h = NULL; |
| 175 | |
| 176 | cf_free(btr->msgp); |
| 177 | btr->msgp = 0; |
| 178 | |
| 179 | if (result_code == AS_ERR_TIMEOUT) { |
| 180 | cf_atomic64_incr(&g_stats.batch_index_timeout); |
| 181 | } |
| 182 | else { |
| 183 | cf_atomic64_incr(&g_stats.batch_index_errors); |
| 184 | } |
| 185 | return status; |
| 186 | } |
| 187 | |
| 188 | static int |
| 189 | as_batch_send_buffer(as_batch_shared* shared, as_batch_buffer* buffer, int32_t flags) |
| 190 | { |
| 191 | cf_socket* sock = &shared->fd_h->sock; |
| 192 | uint8_t* buf = (uint8_t*)&buffer->proto; |
| 193 | size_t size = sizeof(as_proto) + buffer->size; |
| 194 | size_t off = shared->buffer_offset; |
| 195 | size_t remaining = size - off; |
| 196 | |
| 197 | ssize_t sent = cf_socket_try_send_all(sock, buf + off, remaining, flags); |
| 198 | |
| 199 | if (sent < 0) { |
| 200 | shared->bad_response_fd = true; |
| 201 | return BATCH_ERROR; |
| 202 | } |
| 203 | |
| 204 | if (sent < remaining) { |
| 205 | shared->buffer_offset += sent; |
| 206 | return BATCH_DELAY; |
| 207 | } |
| 208 | |
| 209 | return BATCH_SUCCESS; |
| 210 | } |
| 211 | |
| 212 | static inline void |
| 213 | as_batch_release_buffer(as_batch_shared* shared, as_batch_buffer* buffer) |
| 214 | { |
| 215 | if (as_buffer_pool_push_limit(&batch_buffer_pool, buffer, buffer->capacity, |
| 216 | g_config.batch_max_unused_buffers) != 0) { |
| 217 | // The push frees buffer on failure, so we just increment stat. |
| 218 | cf_atomic64_incr(&g_stats.batch_index_destroyed_buffers); |
| 219 | } |
| 220 | } |
| 221 | |
| 222 | static void |
| 223 | as_batch_complete(as_batch_queue* queue, as_batch_shared* shared, int status) |
| 224 | { |
| 225 | as_end_of_transaction(shared->fd_h, status != 0); |
| 226 | shared->fd_h = NULL; |
| 227 | |
| 228 | // For now the model is timeouts don't appear in histograms. |
| 229 | if (shared->result_code != AS_ERR_TIMEOUT) { |
| 230 | G_HIST_ACTIVATE_INSERT_DATA_POINT(batch_index_hist, shared->start); |
| 231 | } |
| 232 | |
| 233 | // Check return code in order to update statistics. |
| 234 | if (status == 0 && shared->result_code == 0) { |
| 235 | cf_atomic64_incr(&g_stats.batch_index_complete); |
| 236 | } |
| 237 | else { |
| 238 | if (shared->result_code == AS_ERR_TIMEOUT) { |
| 239 | cf_atomic64_incr(&g_stats.batch_index_timeout); |
| 240 | } |
| 241 | else { |
| 242 | cf_atomic64_incr(&g_stats.batch_index_errors); |
| 243 | } |
| 244 | } |
| 245 | |
| 246 | // Destroy lock |
| 247 | cf_mutex_destroy(&shared->lock); |
| 248 | |
| 249 | // Release memory |
| 250 | predexp_destroy(shared->predexp); |
| 251 | cf_free(shared->msgp); |
| 252 | cf_free(shared); |
| 253 | |
| 254 | // It's critical that this count is decremented after the transaction is |
| 255 | // completely finished with the queue because "shutdown threads" relies |
| 256 | // on this information when performing graceful shutdown. |
| 257 | cf_atomic32_decr(&queue->tran_count); |
| 258 | } |
| 259 | |
| 260 | static bool |
| 261 | as_batch_send_trailer(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer) |
| 262 | { |
| 263 | // Use existing buffer to store trailer message. |
| 264 | as_proto* proto = &buffer->proto; |
| 265 | proto->version = PROTO_VERSION; |
| 266 | proto->type = PROTO_TYPE_AS_MSG; |
| 267 | proto->sz = sizeof(as_msg); |
| 268 | as_proto_swap(proto); |
| 269 | |
| 270 | as_msg* msg = (as_msg*)buffer->data; |
| 271 | msg->header_sz = sizeof(as_msg); |
| 272 | msg->info1 = 0; |
| 273 | msg->info2 = 0; |
| 274 | msg->info3 = AS_MSG_INFO3_LAST; |
| 275 | msg->unused = 0; |
| 276 | msg->result_code = shared->result_code; |
| 277 | msg->generation = 0; |
| 278 | msg->record_ttl = 0; |
| 279 | msg->transaction_ttl = 0; |
| 280 | msg->n_fields = 0; |
| 281 | msg->n_ops = 0; |
| 282 | as_msg_swap_header(msg); |
| 283 | |
| 284 | buffer->size = sizeof(as_msg); |
| 285 | shared->buffer_offset = 0; |
| 286 | |
| 287 | int status = as_batch_send_buffer(shared, buffer, MSG_NOSIGNAL); |
| 288 | |
| 289 | if (status == BATCH_DELAY) { |
| 290 | return false; |
| 291 | } |
| 292 | |
| 293 | as_batch_release_buffer(shared, buffer); |
| 294 | as_batch_complete(queue, shared, status); |
| 295 | return true; |
| 296 | } |
| 297 | |
| 298 | static inline bool |
| 299 | as_batch_buffer_end(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer, int status) |
| 300 | { |
| 301 | // If we're invoked for the trailer, we're done. Free buffer and batch. |
| 302 | if (shared->in_trailer) { |
| 303 | as_batch_release_buffer(shared, buffer); |
| 304 | as_batch_complete(queue, shared, status); |
| 305 | return true; |
| 306 | } |
| 307 | |
| 308 | shared->tran_count_response += buffer->tran_count; |
| 309 | |
| 310 | // We haven't yet reached the last buffer. Free buffer. |
| 311 | if (shared->tran_count_response < shared->tran_max) { |
| 312 | as_batch_release_buffer(shared, buffer); |
| 313 | return true; |
| 314 | } |
| 315 | |
| 316 | // We've reached the last buffer. If we cannot send a trailer, then we're |
| 317 | // done. Free buffer and batch. |
| 318 | if (shared->bad_response_fd) { |
| 319 | as_batch_release_buffer(shared, buffer); |
| 320 | as_batch_complete(queue, shared, status); |
| 321 | return true; |
| 322 | } |
| 323 | |
| 324 | // Reuse the last buffer for the trailer. |
| 325 | shared->in_trailer = true; |
| 326 | return as_batch_send_trailer(queue, shared, buffer); |
| 327 | } |
| 328 | |
| 329 | static inline bool |
| 330 | as_batch_abandon(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer) |
| 331 | { |
| 332 | if (cf_getns() >= shared->end) { |
| 333 | cf_warning(AS_BATCH, "abandoned batch from %s with %u transactions after %lu ms" , |
| 334 | shared->fd_h->client, shared->tran_max, |
| 335 | (shared->end - shared->start) / 1000000); |
| 336 | shared->bad_response_fd = true; |
| 337 | as_batch_buffer_end(queue, shared, buffer, BATCH_ERROR); |
| 338 | return true; |
| 339 | } |
| 340 | |
| 341 | return false; |
| 342 | } |
| 343 | |
| 344 | static bool |
| 345 | as_batch_send_delayed(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer) |
| 346 | { |
| 347 | // If we get here, other buffers in this batch can't have affected or |
| 348 | // reacted to this batch's status. All that happened is this buffer was |
| 349 | // delayed. Therefore, we don't need to check for error conditions. |
| 350 | |
| 351 | int status = as_batch_send_buffer(shared, buffer, MSG_NOSIGNAL | MSG_MORE); |
| 352 | |
| 353 | if (status == BATCH_DELAY) { |
| 354 | return false; |
| 355 | } |
| 356 | |
| 357 | return as_batch_buffer_end(queue, shared, buffer, status); |
| 358 | } |
| 359 | |
| 360 | static bool |
| 361 | as_batch_send_response(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer) |
| 362 | { |
| 363 | cf_assert(buffer->capacity != 0, AS_BATCH, "buffer capacity 0" ); |
| 364 | |
| 365 | // Don't send buffer if an error has already occurred. |
| 366 | if (shared->bad_response_fd || shared->result_code) { |
| 367 | return as_batch_buffer_end(queue, shared, buffer, BATCH_ERROR); |
| 368 | } |
| 369 | |
| 370 | shared->buffer_offset = 0; |
| 371 | |
| 372 | // Send buffer block to client socket. |
| 373 | buffer->proto.version = PROTO_VERSION; |
| 374 | buffer->proto.type = PROTO_TYPE_AS_MSG; |
| 375 | buffer->proto.sz = buffer->size; |
| 376 | as_proto_swap(&buffer->proto); |
| 377 | |
| 378 | int status = as_batch_send_buffer(shared, buffer, MSG_NOSIGNAL | MSG_MORE); |
| 379 | |
| 380 | if (status == BATCH_DELAY) { |
| 381 | return false; |
| 382 | } |
| 383 | |
| 384 | return as_batch_buffer_end(queue, shared, buffer, status); |
| 385 | } |
| 386 | |
| 387 | static inline void |
| 388 | as_batch_delay_buffer(as_batch_queue* queue) |
| 389 | { |
| 390 | cf_atomic64_incr(&g_stats.batch_index_delay); |
| 391 | |
| 392 | // If all batch transactions on this thread are delayed, avoid tight loop. |
| 393 | if (queue->tran_count == queue->delay_count) { |
| 394 | pthread_yield(); // not cf_thread_yield() - we're using as_thread_pool |
| 395 | } |
| 396 | } |
| 397 | |
| 398 | static void |
| 399 | as_batch_worker(void* udata) |
| 400 | { |
| 401 | // Send batch data to client, one buffer block at a time. |
| 402 | as_batch_work* work = (as_batch_work*)udata; |
| 403 | as_batch_queue* batch_queue = work->batch_queue; |
| 404 | cf_queue* response_queue = batch_queue->response_queue; |
| 405 | as_batch_response response; |
| 406 | as_batch_shared* shared; |
| 407 | as_batch_buffer* buffer; |
| 408 | |
| 409 | while (cf_queue_pop(response_queue, &response, CF_QUEUE_FOREVER) == CF_QUEUE_OK) { |
| 410 | // Check if this thread task should end. |
| 411 | shared = response.shared; |
| 412 | if (! shared) { |
| 413 | break; |
| 414 | } |
| 415 | |
| 416 | buffer = response.buffer; |
| 417 | |
| 418 | if (! shared->delayed_buffer) { |
| 419 | if (as_batch_send_response(batch_queue, shared, buffer)) { |
| 420 | continue; |
| 421 | } |
| 422 | |
| 423 | if (as_batch_abandon(batch_queue, shared, buffer)) { |
| 424 | continue; |
| 425 | } |
| 426 | |
| 427 | // Socket blocked. |
| 428 | shared->delayed_buffer = buffer; |
| 429 | batch_queue->delay_count++; |
| 430 | as_batch_delay_buffer(batch_queue); |
| 431 | } |
| 432 | else { |
| 433 | // Batch is delayed - try only original delayed buffer. |
| 434 | if (shared->delayed_buffer == buffer) { |
| 435 | shared->delayed_buffer = NULL; |
| 436 | |
| 437 | if (as_batch_send_delayed(batch_queue, shared, buffer)) { |
| 438 | batch_queue->delay_count--; |
| 439 | continue; |
| 440 | } |
| 441 | |
| 442 | if (as_batch_abandon(batch_queue, shared, buffer)) { |
| 443 | batch_queue->delay_count--; |
| 444 | continue; |
| 445 | } |
| 446 | |
| 447 | // Socket blocked again. |
| 448 | shared->delayed_buffer = buffer; |
| 449 | as_batch_delay_buffer(batch_queue); |
| 450 | } |
| 451 | // else - delayed by another buffer in this batch, just re-queue. |
| 452 | } |
| 453 | |
| 454 | cf_queue_push(response_queue, &response); |
| 455 | } |
| 456 | |
| 457 | // Send back completion notification. |
| 458 | uint32_t complete = 1; |
| 459 | cf_queue_push(work->batch_queue->complete_queue, &complete); |
| 460 | } |
| 461 | |
| 462 | static int |
| 463 | as_batch_create_thread_queues(uint32_t begin, uint32_t end) |
| 464 | { |
| 465 | // Allocate one queue per batch response worker thread. |
| 466 | int status = 0; |
| 467 | |
| 468 | as_batch_work work; |
| 469 | work.complete = false; |
| 470 | |
| 471 | for (uint32_t i = begin; i < end; i++) { |
| 472 | work.batch_queue = &batch_queues[i]; |
| 473 | work.batch_queue->response_queue = cf_queue_create(sizeof(as_batch_response), true); |
| 474 | work.batch_queue->complete_queue = cf_queue_create(sizeof(uint32_t), true); |
| 475 | work.batch_queue->tran_count = 0; |
| 476 | work.batch_queue->delay_count = 0; |
| 477 | work.batch_queue->active = true; |
| 478 | |
| 479 | int rc = as_thread_pool_queue_task_fixed(&batch_thread_pool, &work); |
| 480 | |
| 481 | if (rc) { |
| 482 | cf_warning(AS_BATCH, "Failed to create batch thread %u: %d" , i, rc); |
| 483 | status = rc; |
| 484 | } |
| 485 | } |
| 486 | return status; |
| 487 | } |
| 488 | |
| 489 | static bool |
| 490 | as_batch_wait(uint32_t begin, uint32_t end) |
| 491 | { |
| 492 | for (uint32_t i = begin; i < end; i++) { |
| 493 | if (batch_queues[i].tran_count > 0) { |
| 494 | return false; |
| 495 | } |
| 496 | } |
| 497 | return true; |
| 498 | } |
| 499 | |
| 500 | static int |
| 501 | as_batch_shutdown_thread_queues(uint32_t begin, uint32_t end) |
| 502 | { |
| 503 | // Set excess queues to inactive. |
| 504 | // Existing batch transactions will be allowed to complete. |
| 505 | for (uint32_t i = begin; i < end; i++) { |
| 506 | batch_queues[i].active = false; |
| 507 | } |
| 508 | |
| 509 | // Wait till there are no more active batch transactions on the queues. |
| 510 | // Timeout after 30 seconds. |
| 511 | uint64_t limitus = cf_getus() + (1000 * 1000 * 30); |
| 512 | usleep(50 * 1000); // Sleep 50ms |
| 513 | do { |
| 514 | if (as_batch_wait(begin, end)) { |
| 515 | break; |
| 516 | } |
| 517 | usleep(500 * 1000); // Sleep 500ms |
| 518 | |
| 519 | if (cf_getus() > limitus) { |
| 520 | cf_warning(AS_BATCH, "Batch shutdown threads failed on timeout. Transactions remain on queue." ); |
| 521 | // Reactivate queues. |
| 522 | for (uint32_t i = begin; i < end; i++) { |
| 523 | batch_queues[i].active = true; |
| 524 | } |
| 525 | return -1; |
| 526 | } |
| 527 | } while (true); |
| 528 | |
| 529 | // Send stop command to excess queues. |
| 530 | as_batch_response response; |
| 531 | memset(&response, 0, sizeof(as_batch_response)); |
| 532 | |
| 533 | for (uint32_t i = begin; i < end; i++) { |
| 534 | cf_queue_push(batch_queues[i].response_queue, &response); |
| 535 | } |
| 536 | |
| 537 | // Wait for completion events. |
| 538 | uint32_t complete; |
| 539 | for (uint32_t i = begin; i < end; i++) { |
| 540 | as_batch_queue* bq = &batch_queues[i]; |
| 541 | cf_queue_pop(bq->complete_queue, &complete, CF_QUEUE_FOREVER); |
| 542 | cf_queue_destroy(bq->complete_queue); |
| 543 | bq->complete_queue = 0; |
| 544 | cf_queue_destroy(bq->response_queue); |
| 545 | bq->response_queue = 0; |
| 546 | } |
| 547 | return 0; |
| 548 | } |
| 549 | |
| 550 | static as_batch_queue* |
| 551 | as_batch_find_queue(int queue_index) |
| 552 | { |
| 553 | // Search backwards for an active queue. |
| 554 | for (int index = queue_index - 1; index >= 0; index--) { |
| 555 | as_batch_queue* bq = &batch_queues[index]; |
| 556 | |
| 557 | if (bq->active && cf_queue_sz(bq->response_queue) < g_config.batch_max_buffers_per_queue) { |
| 558 | return bq; |
| 559 | } |
| 560 | } |
| 561 | |
| 562 | // Search forwards. |
| 563 | for (int index = queue_index + 1; index < MAX_BATCH_THREADS; index++) { |
| 564 | as_batch_queue* bq = &batch_queues[index]; |
| 565 | |
| 566 | // If current queue is not active, future queues will not be active either. |
| 567 | if (! bq->active) { |
| 568 | break; |
| 569 | } |
| 570 | |
| 571 | if (cf_queue_sz(bq->response_queue) < g_config.batch_max_buffers_per_queue) { |
| 572 | return bq; |
| 573 | } |
| 574 | } |
| 575 | return 0; |
| 576 | } |
| 577 | |
| 578 | static as_batch_buffer* |
| 579 | as_batch_buffer_create(uint32_t size) |
| 580 | { |
| 581 | as_batch_buffer* buffer = cf_malloc(size); |
| 582 | buffer->capacity = size - batch_buffer_pool.header_size; |
| 583 | cf_atomic64_incr(&g_stats.batch_index_created_buffers); |
| 584 | return buffer; |
| 585 | } |
| 586 | |
| 587 | static uint8_t* |
| 588 | as_batch_buffer_pop(as_batch_shared* shared, uint32_t size) |
| 589 | { |
| 590 | as_batch_buffer* buffer; |
| 591 | uint32_t mem_size = size + batch_buffer_pool.header_size; |
| 592 | |
| 593 | if (mem_size > batch_buffer_pool.buffer_size) { |
| 594 | // Requested size is greater than fixed buffer size. |
| 595 | // Allocate new buffer, but don't put back into pool. |
| 596 | buffer = as_batch_buffer_create(mem_size); |
| 597 | cf_atomic64_incr(&g_stats.batch_index_huge_buffers); |
| 598 | } |
| 599 | else { |
| 600 | // Pop existing buffer from queue. |
| 601 | // The extra lock here is unavoidable. |
| 602 | int status = cf_queue_pop(batch_buffer_pool.queue, &buffer, CF_QUEUE_NOWAIT); |
| 603 | |
| 604 | if (status == CF_QUEUE_OK) { |
| 605 | buffer->capacity = batch_buffer_pool.buffer_size - batch_buffer_pool.header_size; |
| 606 | } |
| 607 | else if (status == CF_QUEUE_EMPTY) { |
| 608 | // Queue is empty. Create new buffer. |
| 609 | buffer = as_batch_buffer_create(batch_buffer_pool.buffer_size); |
| 610 | } |
| 611 | else { |
| 612 | cf_crash(AS_BATCH, "Failed to pop new batch buffer: %d" , status); |
| 613 | } |
| 614 | } |
| 615 | |
| 616 | // Reserve a slot in new buffer. |
| 617 | buffer->size = size; |
| 618 | buffer->tran_count = 1; |
| 619 | buffer->writers = 2; |
| 620 | shared->buffer = buffer; |
| 621 | return buffer->data; |
| 622 | } |
| 623 | |
| 624 | static inline void |
| 625 | as_batch_buffer_complete(as_batch_shared* shared, as_batch_buffer* buffer) |
| 626 | { |
| 627 | // Flush when all writers have finished writing into the buffer. |
| 628 | if (cf_atomic32_decr(&buffer->writers) == 0) { |
| 629 | as_batch_response response = {.shared = shared, .buffer = buffer}; |
| 630 | cf_queue_push(shared->response_queue, &response); |
| 631 | } |
| 632 | } |
| 633 | |
| 634 | static uint8_t* |
| 635 | as_batch_reserve(as_batch_shared* shared, uint32_t size, int result_code, as_batch_buffer** buffer_out, bool* complete) |
| 636 | { |
| 637 | as_batch_buffer* buffer; |
| 638 | uint8_t* data; |
| 639 | |
| 640 | cf_mutex_lock(&shared->lock); |
| 641 | *complete = (++shared->tran_count == shared->tran_max); |
| 642 | buffer = shared->buffer; |
| 643 | |
| 644 | if (! buffer) { |
| 645 | // No previous buffer. Get new buffer. |
| 646 | data = as_batch_buffer_pop(shared, size); |
| 647 | *buffer_out = shared->buffer; |
| 648 | cf_mutex_unlock(&shared->lock); |
| 649 | } |
| 650 | else if (buffer->size + size <= buffer->capacity) { |
| 651 | // Result fits into existing block. Reserve a slot. |
| 652 | data = buffer->data + buffer->size; |
| 653 | buffer->size += size; |
| 654 | buffer->tran_count++; |
| 655 | cf_atomic32_incr(&buffer->writers); |
| 656 | *buffer_out = buffer; |
| 657 | cf_mutex_unlock(&shared->lock); |
| 658 | } |
| 659 | else { |
| 660 | // Result does not fit into existing block. |
| 661 | // Make copy of existing buffer. |
| 662 | as_batch_buffer* prev_buffer = buffer; |
| 663 | |
| 664 | // Get new buffer. |
| 665 | data = as_batch_buffer_pop(shared, size); |
| 666 | *buffer_out = shared->buffer; |
| 667 | cf_mutex_unlock(&shared->lock); |
| 668 | |
| 669 | as_batch_buffer_complete(shared, prev_buffer); |
| 670 | } |
| 671 | |
| 672 | if (! (result_code == AS_OK || result_code == AS_ERR_NOT_FOUND || |
| 673 | result_code == AS_ERR_FILTERED_OUT)) { |
| 674 | // Result code can be set outside of lock because it doesn't matter which transaction's |
| 675 | // result code is used as long as it's an error. |
| 676 | shared->result_code = result_code; |
| 677 | } |
| 678 | return data; |
| 679 | } |
| 680 | |
| 681 | static inline void |
| 682 | as_batch_transaction_end(as_batch_shared* shared, as_batch_buffer* buffer, bool complete) |
| 683 | { |
| 684 | // This flush can only be triggered when the buffer is full. |
| 685 | as_batch_buffer_complete(shared, buffer); |
| 686 | |
| 687 | if (complete) { |
| 688 | // This flush only occurs when all transactions in batch have been processed. |
| 689 | as_batch_buffer_complete(shared, buffer); |
| 690 | } |
| 691 | } |
| 692 | |
| 693 | static void |
| 694 | as_batch_terminate(as_batch_shared* shared, uint32_t tran_count, int result_code) |
| 695 | { |
| 696 | // Terminate batch by adding phantom transactions to shared and buffer tran counts. |
| 697 | // This is done so the memory is released at the end only once. |
| 698 | as_batch_buffer* buffer; |
| 699 | bool complete; |
| 700 | |
| 701 | cf_mutex_lock(&shared->lock); |
| 702 | buffer = shared->buffer; |
| 703 | shared->result_code = result_code; |
| 704 | shared->tran_count += tran_count; |
| 705 | complete = (shared->tran_count == shared->tran_max); |
| 706 | |
| 707 | if (! buffer) { |
| 708 | // No previous buffer. Get new buffer. |
| 709 | as_batch_buffer_pop(shared, 0); |
| 710 | buffer = shared->buffer; |
| 711 | buffer->tran_count = tran_count; // Override tran_count. |
| 712 | } |
| 713 | else { |
| 714 | // Buffer exists. Add phantom transactions. |
| 715 | buffer->tran_count += tran_count; |
| 716 | cf_atomic32_incr(&buffer->writers); |
| 717 | } |
| 718 | cf_mutex_unlock(&shared->lock); |
| 719 | as_batch_transaction_end(shared, buffer, complete); |
| 720 | } |
| 721 | |
| 722 | //--------------------------------------------------------- |
| 723 | // FUNCTIONS |
| 724 | //--------------------------------------------------------- |
| 725 | |
| 726 | int |
| 727 | as_batch_init() |
| 728 | { |
| 729 | cf_mutex_init(&batch_resize_lock); |
| 730 | |
| 731 | // Default 'batch-index-threads' can't be set before call to cf_topo_init(). |
| 732 | if (g_config.n_batch_index_threads == 0) { |
| 733 | g_config.n_batch_index_threads = cf_topo_count_cpus(); |
| 734 | } |
| 735 | |
| 736 | cf_info(AS_BATCH, "starting %u batch-index-threads" , g_config.n_batch_index_threads); |
| 737 | |
| 738 | int rc = as_thread_pool_init_fixed(&batch_thread_pool, g_config.n_batch_index_threads, as_batch_worker, |
| 739 | sizeof(as_batch_work), offsetof(as_batch_work,complete)); |
| 740 | |
| 741 | if (rc) { |
| 742 | cf_warning(AS_BATCH, "Failed to initialize batch-index-threads to %u: %d" , g_config.n_batch_index_threads, rc); |
| 743 | return rc; |
| 744 | } |
| 745 | |
| 746 | rc = as_buffer_pool_init(&batch_buffer_pool, sizeof(as_batch_buffer), BATCH_BLOCK_SIZE); |
| 747 | |
| 748 | if (rc) { |
| 749 | cf_warning(AS_BATCH, "Failed to initialize batch buffer pool: %d" , rc); |
| 750 | return rc; |
| 751 | } |
| 752 | |
| 753 | rc = as_batch_create_thread_queues(0, g_config.n_batch_index_threads); |
| 754 | |
| 755 | if (rc) { |
| 756 | return rc; |
| 757 | } |
| 758 | |
| 759 | return 0; |
| 760 | } |
| 761 | |
| 762 | int |
| 763 | as_batch_queue_task(as_transaction* btr) |
| 764 | { |
| 765 | uint64_t counter = cf_atomic64_incr(&g_stats.batch_index_initiate); |
| 766 | uint32_t thread_size = batch_thread_pool.thread_size; |
| 767 | |
| 768 | if (thread_size == 0 || thread_size > MAX_BATCH_THREADS) { |
| 769 | cf_warning(AS_BATCH, "batch-index-threads has been disabled: %d" , thread_size); |
| 770 | return as_batch_send_error(btr, AS_ERR_BATCH_DISABLED); |
| 771 | } |
| 772 | uint32_t queue_index = counter % thread_size; |
| 773 | |
| 774 | // Validate batch transaction |
| 775 | as_proto* bproto = &btr->msgp->proto; |
| 776 | |
| 777 | if (bproto->sz > PROTO_SIZE_MAX) { |
| 778 | cf_warning(AS_BATCH, "can't process message: invalid size %lu should be %d or less" , |
| 779 | (uint64_t)bproto->sz, PROTO_SIZE_MAX); |
| 780 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
| 781 | } |
| 782 | |
| 783 | if (bproto->type != PROTO_TYPE_AS_MSG) { |
| 784 | cf_warning(AS_BATCH, "Invalid proto type. Expected %d Received %d" , PROTO_TYPE_AS_MSG, bproto->type); |
| 785 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
| 786 | } |
| 787 | |
| 788 | // Check that the socket is authenticated. |
| 789 | uint8_t result = as_security_check(btr->from.proto_fd_h, PERM_NONE); |
| 790 | |
| 791 | if (result != AS_OK) { |
| 792 | as_security_log(btr->from.proto_fd_h, result, PERM_NONE, NULL, NULL); |
| 793 | return as_batch_send_error(btr, result); |
| 794 | } |
| 795 | |
| 796 | // Parse header |
| 797 | as_msg* bmsg = &btr->msgp->msg; |
| 798 | as_msg_swap_header(bmsg); |
| 799 | |
| 800 | // Parse fields |
| 801 | uint8_t* limit = (uint8_t*)bmsg + bproto->sz; |
| 802 | as_msg_field* mf = (as_msg_field*)bmsg->data; |
| 803 | as_msg_field* end; |
| 804 | as_msg_field* bf = 0; |
| 805 | as_msg_field* predexp_mf = 0; |
| 806 | |
| 807 | for (int i = 0; i < bmsg->n_fields; i++) { |
| 808 | if ((uint8_t*)mf >= limit) { |
| 809 | cf_warning(AS_BATCH, "Batch field limit reached" ); |
| 810 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
| 811 | } |
| 812 | as_msg_swap_field(mf); |
| 813 | end = as_msg_field_get_next(mf); |
| 814 | |
| 815 | if (mf->type == AS_MSG_FIELD_TYPE_BATCH || mf->type == AS_MSG_FIELD_TYPE_BATCH_WITH_SET) { |
| 816 | bf = mf; |
| 817 | } |
| 818 | else if (mf->type == AS_MSG_FIELD_TYPE_PREDEXP) { |
| 819 | predexp_mf = mf; |
| 820 | } |
| 821 | |
| 822 | mf = end; |
| 823 | } |
| 824 | |
| 825 | if (! bf) { |
| 826 | cf_warning(AS_BATCH, "Batch index field not found" ); |
| 827 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
| 828 | } |
| 829 | |
| 830 | // Parse batch field |
| 831 | uint8_t* data = bf->data; |
| 832 | uint32_t tran_count = cf_swap_from_be32(*(uint32_t*)data); |
| 833 | data += sizeof(uint32_t); |
| 834 | |
| 835 | if (tran_count == 0) { |
| 836 | cf_warning(AS_BATCH, "Batch request size is zero" ); |
| 837 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
| 838 | } |
| 839 | |
| 840 | if (tran_count > g_config.batch_max_requests) { |
| 841 | cf_warning(AS_BATCH, "Batch request size %u exceeds max %u" , tran_count, g_config.batch_max_requests); |
| 842 | return as_batch_send_error(btr, AS_ERR_BATCH_MAX_REQUESTS); |
| 843 | } |
| 844 | |
| 845 | // Initialize shared data |
| 846 | as_batch_shared* shared = cf_malloc(sizeof(as_batch_shared)); |
| 847 | |
| 848 | memset(shared, 0, sizeof(as_batch_shared)); |
| 849 | |
| 850 | cf_mutex_init(&shared->lock); |
| 851 | |
| 852 | // Abandon batch at twice batch timeout if batch timeout is defined. |
| 853 | // If batch timeout is zero, abandon after 30 seconds. |
| 854 | shared->end = btr->start_time + ((bmsg->transaction_ttl != 0) ? |
| 855 | ((uint64_t)bmsg->transaction_ttl * 1000 * 1000 * 2) : |
| 856 | BATCH_ABANDON_LIMIT); |
| 857 | |
| 858 | shared->start = btr->start_time; |
| 859 | shared->fd_h = btr->from.proto_fd_h; |
| 860 | shared->msgp = btr->msgp; |
| 861 | shared->tran_max = tran_count; |
| 862 | |
| 863 | // Find batch queue to send transaction responses. |
| 864 | as_batch_queue* batch_queue = &batch_queues[queue_index]; |
| 865 | |
| 866 | // batch_max_buffers_per_queue is a soft limit, but still must be checked under lock. |
| 867 | if (! (batch_queue->active && cf_queue_sz(batch_queue->response_queue) < g_config.batch_max_buffers_per_queue)) { |
| 868 | // Queue buffer limit has been exceeded or thread has been shutdown (probably due to |
| 869 | // downwards thread resize). Search for an available queue. |
| 870 | // cf_warning(AS_BATCH, "Queue %u full %d", queue_index, cf_queue_sz(batch_queue->response_queue)); |
| 871 | batch_queue = as_batch_find_queue(queue_index); |
| 872 | |
| 873 | if (! batch_queue) { |
| 874 | cf_warning(AS_BATCH, "Failed to find active batch queue that is not full" ); |
| 875 | cf_free(shared); |
| 876 | return as_batch_send_error(btr, AS_ERR_BATCH_QUEUES_FULL); |
| 877 | } |
| 878 | } |
| 879 | |
| 880 | if (predexp_mf != NULL) { |
| 881 | shared->predexp_mf = predexp_mf; |
| 882 | |
| 883 | if ((shared->predexp = predexp_build(predexp_mf)) == NULL) { |
| 884 | cf_warning(AS_BATCH, "Failed to build batch predexp" ); |
| 885 | cf_free(shared); |
| 886 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
| 887 | } |
| 888 | } |
| 889 | |
| 890 | // Increment batch queue transaction count. |
| 891 | cf_atomic32_incr(&batch_queue->tran_count); |
| 892 | shared->response_queue = batch_queue->response_queue; |
| 893 | |
| 894 | // Initialize generic transaction. |
| 895 | as_transaction tr; |
| 896 | as_transaction_init_head(&tr, 0, 0); |
| 897 | |
| 898 | tr.origin = FROM_BATCH; |
| 899 | tr.from_flags |= FROM_FLAG_BATCH_SUB; |
| 900 | tr.start_time = btr->start_time; |
| 901 | |
| 902 | // Read batch keys and initialize generic transactions. |
| 903 | as_batch_input* in; |
| 904 | cl_msg* out = NULL; |
| 905 | cl_msg* prev_msgp = NULL; |
| 906 | as_msg_op* op; |
| 907 | uint32_t tran_row = 0; |
| 908 | uint8_t info = *data++; // allow transaction inline. |
| 909 | |
| 910 | bool allow_inline = (g_config.n_namespaces_inlined != 0 && info); |
| 911 | bool check_inline = (allow_inline && g_config.n_namespaces_not_inlined != 0); |
| 912 | bool should_inline = (allow_inline && g_config.n_namespaces_not_inlined == 0); |
| 913 | |
| 914 | // Split batch rows into separate single record read transactions. |
| 915 | // The read transactions are located in the same memory block as |
| 916 | // the original batch transactions. This allows us to avoid performing |
| 917 | // an extra malloc for each transaction. |
| 918 | while (tran_row < tran_count && data + BATCH_REPEAT_SIZE <= limit) { |
| 919 | // Copy transaction data before memory gets overwritten. |
| 920 | in = (as_batch_input*)data; |
| 921 | |
| 922 | tr.from.batch_shared = shared; // is set NULL after sub-transaction |
| 923 | tr.from_data.batch_index = cf_swap_from_be32(in->index); |
| 924 | tr.keyd = in->keyd; |
| 925 | tr.benchmark_time = btr->benchmark_time; // must reset for each usage |
| 926 | |
| 927 | if (in->repeat) { |
| 928 | if (! prev_msgp) { |
| 929 | break; // bad bytes from client - repeat set on first item |
| 930 | } |
| 931 | |
| 932 | // Row should use previous namespace and bin names. |
| 933 | data += BATCH_REPEAT_SIZE; |
| 934 | tr.msgp = prev_msgp; |
| 935 | } |
| 936 | else { |
| 937 | tr.msg_fields = 0; // erase previous AS_MSG_FIELD_BIT_SET flag, if any |
| 938 | as_transaction_set_msg_field_flag(&tr, AS_MSG_FIELD_TYPE_NAMESPACE); |
| 939 | |
| 940 | // Row contains full namespace/bin names. |
| 941 | out = (cl_msg*)data; |
| 942 | |
| 943 | if (data + sizeof(cl_msg) + sizeof(as_msg_field) > limit) { |
| 944 | break; |
| 945 | } |
| 946 | |
| 947 | out->msg.header_sz = sizeof(as_msg); |
| 948 | out->msg.info1 = in->info1; |
| 949 | out->msg.info2 = 0; |
| 950 | out->msg.info3 = bmsg->info3 & |
| 951 | (AS_MSG_INFO3_SC_READ_RELAX | AS_MSG_INFO3_SC_READ_TYPE); |
| 952 | out->msg.unused = 0; |
| 953 | out->msg.result_code = 0; |
| 954 | out->msg.generation = 0; |
| 955 | out->msg.record_ttl = 0; |
| 956 | out->msg.transaction_ttl = bmsg->transaction_ttl; // already swapped |
| 957 | // n_fields/n_ops is in exact same place on both input/output, but the value still |
| 958 | // needs to be swapped. |
| 959 | out->msg.n_fields = cf_swap_from_be16(in->n_fields); |
| 960 | |
| 961 | // Older clients sent zero, but always sent namespace. Adjust this. |
| 962 | if (out->msg.n_fields == 0) { |
| 963 | out->msg.n_fields = 1; |
| 964 | } |
| 965 | |
| 966 | out->msg.n_ops = cf_swap_from_be16(in->n_ops); |
| 967 | |
| 968 | // Namespace input is same as namespace field, so just leave in place and swap. |
| 969 | data += sizeof(cl_msg); |
| 970 | mf = (as_msg_field*)data; |
| 971 | as_msg_swap_field(mf); |
| 972 | if (check_inline) { |
| 973 | as_namespace* ns = as_namespace_get_bymsgfield(mf); |
| 974 | should_inline = ns && ns->storage_data_in_memory; |
| 975 | } |
| 976 | mf = as_msg_field_get_next(mf); |
| 977 | data = (uint8_t*)mf; |
| 978 | |
| 979 | // Swap remaining fields. |
| 980 | for (uint16_t j = 1; j < out->msg.n_fields; j++) { |
| 981 | if (data + sizeof(as_msg_field) > limit) { |
| 982 | goto TranEnd; |
| 983 | } |
| 984 | |
| 985 | if (mf->type == AS_MSG_FIELD_TYPE_SET) { |
| 986 | as_transaction_set_msg_field_flag(&tr, AS_MSG_FIELD_TYPE_SET); |
| 987 | } |
| 988 | |
| 989 | as_msg_swap_field(mf); |
| 990 | mf = as_msg_field_get_next(mf); |
| 991 | data = (uint8_t*)mf; |
| 992 | } |
| 993 | |
| 994 | if (out->msg.n_ops) { |
| 995 | // Bin names input is same as transaction ops, so just leave in place and swap. |
| 996 | uint16_t n_ops = out->msg.n_ops; |
| 997 | for (uint16_t j = 0; j < n_ops; j++) { |
| 998 | if (data + sizeof(as_msg_op) > limit) { |
| 999 | goto TranEnd; |
| 1000 | } |
| 1001 | op = (as_msg_op*)data; |
| 1002 | as_msg_swap_op(op); |
| 1003 | op = as_msg_op_get_next(op); |
| 1004 | data = (uint8_t*)op; |
| 1005 | } |
| 1006 | } |
| 1007 | |
| 1008 | // Initialize msg header. |
| 1009 | out->proto.version = PROTO_VERSION; |
| 1010 | out->proto.type = PROTO_TYPE_AS_MSG; |
| 1011 | out->proto.sz = (data - (uint8_t*)&out->msg); |
| 1012 | tr.msgp = out; |
| 1013 | prev_msgp = out; |
| 1014 | } |
| 1015 | |
| 1016 | if (data > limit) { |
| 1017 | break; |
| 1018 | } |
| 1019 | |
| 1020 | // Submit transaction. |
| 1021 | if (should_inline) { |
| 1022 | as_tsvc_process_transaction(&tr); |
| 1023 | } |
| 1024 | else { |
| 1025 | // Queue transaction to be processed by a transaction thread. |
| 1026 | as_service_enqueue_internal(&tr); |
| 1027 | } |
| 1028 | tran_row++; |
| 1029 | } |
| 1030 | |
| 1031 | TranEnd: |
| 1032 | if (tran_row < tran_count) { |
| 1033 | // Mismatch between tran_count and actual data. Terminate transaction. |
| 1034 | cf_warning(AS_BATCH, "Batch keys mismatch. Expected %u Received %u" , tran_count, tran_row); |
| 1035 | as_batch_terminate(shared, tran_count - tran_row, AS_ERR_PARAMETER); |
| 1036 | } |
| 1037 | |
| 1038 | // Reset original socket because socket now owned by batch shared. |
| 1039 | btr->from.proto_fd_h = NULL; |
| 1040 | return 0; |
| 1041 | } |
| 1042 | |
| 1043 | void |
| 1044 | as_batch_add_result(as_transaction* tr, uint16_t n_bins, as_bin** bins, |
| 1045 | as_msg_op** ops) |
| 1046 | { |
| 1047 | as_namespace* ns = tr->rsv.ns; |
| 1048 | |
| 1049 | // Calculate size. |
| 1050 | size_t size = sizeof(as_msg); |
| 1051 | size += sizeof(as_msg_field) + sizeof(cf_digest); |
| 1052 | |
| 1053 | uint16_t n_fields = 1; |
| 1054 | |
| 1055 | for (uint16_t i = 0; i < n_bins; i++) { |
| 1056 | as_bin* bin = bins[i]; |
| 1057 | size += sizeof(as_msg_op); |
| 1058 | |
| 1059 | if (ops) { |
| 1060 | size += ops[i]->name_sz; |
| 1061 | } |
| 1062 | else if (bin) { |
| 1063 | size += ns->single_bin ? 0 : strlen(as_bin_get_name_from_id(ns, bin->id)); |
| 1064 | } |
| 1065 | else { |
| 1066 | cf_crash(AS_BATCH, "making response message with null bin and op" ); |
| 1067 | } |
| 1068 | |
| 1069 | if (bin) { |
| 1070 | size += as_bin_particle_client_value_size(bin); |
| 1071 | } |
| 1072 | } |
| 1073 | |
| 1074 | as_batch_shared* shared = tr->from.batch_shared; |
| 1075 | |
| 1076 | as_batch_buffer* buffer; |
| 1077 | bool complete; |
| 1078 | uint8_t* data = as_batch_reserve(shared, size, tr->result_code, &buffer, &complete); |
| 1079 | |
| 1080 | if (data) { |
| 1081 | // Write header. |
| 1082 | uint8_t* p = data; |
| 1083 | as_msg* m = (as_msg*)p; |
| 1084 | m->header_sz = sizeof(as_msg); |
| 1085 | m->info1 = 0; |
| 1086 | m->info2 = 0; |
| 1087 | m->info3 = 0; |
| 1088 | m->unused = 0; |
| 1089 | m->result_code = tr->result_code; |
| 1090 | m->generation = plain_generation(tr->generation, ns); |
| 1091 | m->record_ttl = tr->void_time; |
| 1092 | |
| 1093 | // Overload transaction_ttl to store batch index. |
| 1094 | m->transaction_ttl = tr->from_data.batch_index; |
| 1095 | |
| 1096 | m->n_fields = n_fields; |
| 1097 | m->n_ops = n_bins; |
| 1098 | as_msg_swap_header(m); |
| 1099 | p += sizeof(as_msg); |
| 1100 | |
| 1101 | as_msg_field* field = (as_msg_field*)p; |
| 1102 | field->field_sz = sizeof(cf_digest) + 1; |
| 1103 | field->type = AS_MSG_FIELD_TYPE_DIGEST_RIPE; |
| 1104 | memcpy(field->data, &tr->keyd, sizeof(cf_digest)); |
| 1105 | as_msg_swap_field(field); |
| 1106 | p += sizeof(as_msg_field) + sizeof(cf_digest); |
| 1107 | |
| 1108 | for (uint16_t i = 0; i < n_bins; i++) { |
| 1109 | as_bin* bin = bins[i]; |
| 1110 | as_msg_op* op = (as_msg_op*)p; |
| 1111 | op->op = AS_MSG_OP_READ; |
| 1112 | op->version = 0; |
| 1113 | |
| 1114 | if (ops) { |
| 1115 | as_msg_op* src = ops[i]; |
| 1116 | memcpy(op->name, src->name, src->name_sz); |
| 1117 | op->name_sz = src->name_sz; |
| 1118 | } |
| 1119 | else { |
| 1120 | op->name_sz = as_bin_memcpy_name(ns, op->name, bin); |
| 1121 | } |
| 1122 | |
| 1123 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
| 1124 | p += sizeof(as_msg_op) + op->name_sz; |
| 1125 | p += as_bin_particle_to_client(bin, op); |
| 1126 | as_msg_swap_op(op); |
| 1127 | } |
| 1128 | } |
| 1129 | as_batch_transaction_end(shared, buffer, complete); |
| 1130 | } |
| 1131 | |
| 1132 | void |
| 1133 | as_batch_add_proxy_result(as_batch_shared* shared, uint32_t index, cf_digest* digest, cl_msg* cmsg, size_t proxy_size) |
| 1134 | { |
| 1135 | as_msg* msg = &cmsg->msg; |
| 1136 | size_t size = proxy_size + sizeof(as_msg_field) + sizeof(cf_digest) - sizeof(as_proto); |
| 1137 | |
| 1138 | as_batch_buffer* buffer; |
| 1139 | bool complete; |
| 1140 | uint8_t* data = as_batch_reserve(shared, size, msg->result_code, &buffer, &complete); |
| 1141 | |
| 1142 | if (data) { |
| 1143 | // Overload transaction_ttl to store batch index. |
| 1144 | msg->transaction_ttl = htonl(index); |
| 1145 | |
| 1146 | // Write header |
| 1147 | uint16_t n_fields = ntohs(msg->n_fields); |
| 1148 | msg->n_fields = htons(n_fields + 1); |
| 1149 | memcpy(data, msg, sizeof(as_msg)); |
| 1150 | uint8_t* trg = data + sizeof(as_msg); |
| 1151 | |
| 1152 | // Write digest field |
| 1153 | as_msg_field* field = (as_msg_field*)trg; |
| 1154 | field->field_sz = sizeof(cf_digest) + 1; |
| 1155 | field->type = AS_MSG_FIELD_TYPE_DIGEST_RIPE; |
| 1156 | memcpy(field->data, digest, sizeof(cf_digest)); |
| 1157 | as_msg_swap_field(field); |
| 1158 | trg += sizeof(as_msg_field) + sizeof(cf_digest); |
| 1159 | |
| 1160 | // Copy others fields and ops. |
| 1161 | size = ((uint8_t*)cmsg + proxy_size) - msg->data; |
| 1162 | memcpy(trg, msg->data, size); |
| 1163 | } |
| 1164 | as_batch_transaction_end(shared, buffer, complete); |
| 1165 | } |
| 1166 | |
| 1167 | void |
| 1168 | as_batch_add_error(as_batch_shared* shared, uint32_t index, int result_code) |
| 1169 | { |
| 1170 | as_batch_buffer* buffer; |
| 1171 | bool complete; |
| 1172 | uint8_t* data = as_batch_reserve(shared, sizeof(as_msg), result_code, &buffer, &complete); |
| 1173 | |
| 1174 | if (data) { |
| 1175 | // Write error. |
| 1176 | as_msg* m = (as_msg*)data; |
| 1177 | m->header_sz = sizeof(as_msg); |
| 1178 | m->info1 = 0; |
| 1179 | m->info2 = 0; |
| 1180 | m->info3 = 0; |
| 1181 | m->unused = 0; |
| 1182 | m->result_code = result_code; |
| 1183 | m->generation = 0; |
| 1184 | m->record_ttl = 0; |
| 1185 | // Overload transaction_ttl to store batch index. |
| 1186 | m->transaction_ttl = index; |
| 1187 | m->n_fields = 0; |
| 1188 | m->n_ops = 0; |
| 1189 | as_msg_swap_header(m); |
| 1190 | } |
| 1191 | as_batch_transaction_end(shared, buffer, complete); |
| 1192 | } |
| 1193 | |
| 1194 | int |
| 1195 | as_batch_threads_resize(uint32_t threads) |
| 1196 | { |
| 1197 | if (threads > MAX_BATCH_THREADS) { |
| 1198 | cf_warning(AS_BATCH, "batch-index-threads %u exceeds max %u" , threads, MAX_BATCH_THREADS); |
| 1199 | return -1; |
| 1200 | } |
| 1201 | |
| 1202 | cf_mutex_lock(&batch_resize_lock); |
| 1203 | |
| 1204 | // Resize thread pool. The threads will wait for graceful shutdown on downwards resize. |
| 1205 | uint32_t threads_orig = batch_thread_pool.thread_size; |
| 1206 | cf_info(AS_BATCH, "Resize batch-index-threads from %u to %u" , threads_orig, threads); |
| 1207 | int status = 0; |
| 1208 | |
| 1209 | if (threads != threads_orig) { |
| 1210 | if (threads > threads_orig) { |
| 1211 | // Increase threads before initializing queues. |
| 1212 | status = as_thread_pool_resize(&batch_thread_pool, threads); |
| 1213 | |
| 1214 | if (status == 0) { |
| 1215 | g_config.n_batch_index_threads = threads; |
| 1216 | // Adjust queues to match new thread size. |
| 1217 | status = as_batch_create_thread_queues(threads_orig, threads); |
| 1218 | } |
| 1219 | else { |
| 1220 | // Show warning, but keep going as some threads may have been successfully added/removed. |
| 1221 | cf_warning(AS_BATCH, "Failed to resize batch-index-threads. status=%d, batch-index-threads=%u" , |
| 1222 | status, g_config.n_batch_index_threads); |
| 1223 | threads = batch_thread_pool.thread_size; |
| 1224 | |
| 1225 | if (threads > threads_orig) { |
| 1226 | g_config.n_batch_index_threads = threads; |
| 1227 | // Adjust queues to match new thread size. |
| 1228 | status = as_batch_create_thread_queues(threads_orig, threads); |
| 1229 | } |
| 1230 | } |
| 1231 | } |
| 1232 | else { |
| 1233 | // Shutdown queues before shutting down threads. |
| 1234 | status = as_batch_shutdown_thread_queues(threads, threads_orig); |
| 1235 | |
| 1236 | if (status == 0) { |
| 1237 | // Adjust threads to match new queue size. |
| 1238 | status = as_thread_pool_resize(&batch_thread_pool, threads); |
| 1239 | g_config.n_batch_index_threads = batch_thread_pool.thread_size; |
| 1240 | |
| 1241 | if (status) { |
| 1242 | cf_warning(AS_BATCH, "Failed to resize batch-index-threads. status=%d, batch-index-threads=%u" , |
| 1243 | status, g_config.n_batch_index_threads); |
| 1244 | } |
| 1245 | } |
| 1246 | } |
| 1247 | } |
| 1248 | cf_mutex_unlock(&batch_resize_lock); |
| 1249 | return status; |
| 1250 | } |
| 1251 | |
| 1252 | void |
| 1253 | as_batch_queues_info(cf_dyn_buf* db) |
| 1254 | { |
| 1255 | cf_mutex_lock(&batch_resize_lock); |
| 1256 | |
| 1257 | uint32_t max = batch_thread_pool.thread_size; |
| 1258 | |
| 1259 | for (uint32_t i = 0; i < max; i++) { |
| 1260 | if (i > 0) { |
| 1261 | cf_dyn_buf_append_char(db, ','); |
| 1262 | } |
| 1263 | as_batch_queue* bq = &batch_queues[i]; |
| 1264 | cf_dyn_buf_append_uint32(db, bq->tran_count); // Batch count |
| 1265 | cf_dyn_buf_append_char(db, ':'); |
| 1266 | cf_dyn_buf_append_int(db, cf_queue_sz(bq->response_queue)); // Buffer count |
| 1267 | } |
| 1268 | cf_mutex_unlock(&batch_resize_lock); |
| 1269 | } |
| 1270 | |
| 1271 | int |
| 1272 | as_batch_unused_buffers() |
| 1273 | { |
| 1274 | return cf_queue_sz(batch_buffer_pool.queue); |
| 1275 | } |
| 1276 | |
| 1277 | // Not currently called. Put in this place holder in case server decides to |
| 1278 | // implement clean shutdowns in the future. |
| 1279 | void |
| 1280 | as_batch_destroy() |
| 1281 | { |
| 1282 | as_thread_pool_destroy(&batch_thread_pool); |
| 1283 | as_buffer_pool_destroy(&batch_buffer_pool); |
| 1284 | |
| 1285 | cf_mutex_lock(&batch_resize_lock); |
| 1286 | as_batch_shutdown_thread_queues(0, batch_thread_pool.thread_size); |
| 1287 | cf_mutex_unlock(&batch_resize_lock); |
| 1288 | cf_mutex_destroy(&batch_resize_lock); |
| 1289 | } |
| 1290 | |
| 1291 | as_file_handle* |
| 1292 | as_batch_get_fd_h(as_batch_shared* shared) |
| 1293 | { |
| 1294 | return shared->fd_h; |
| 1295 | } |
| 1296 | |
| 1297 | as_msg_field* |
| 1298 | as_batch_get_predexp_mf(as_batch_shared* shared) |
| 1299 | { |
| 1300 | return shared->predexp_mf; |
| 1301 | } |
| 1302 | |
| 1303 | predexp_eval_t* |
| 1304 | as_batch_get_predexp(as_batch_shared* shared) |
| 1305 | { |
| 1306 | return shared->predexp; |
| 1307 | } |
| 1308 | |