1/*
2 * batch.c
3 *
4 * Copyright (C) 2012-2015 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22#include "base/batch.h"
23#include "aerospike/as_buffer_pool.h"
24#include "aerospike/as_thread_pool.h"
25#include "citrusleaf/alloc.h"
26#include "citrusleaf/cf_atomic.h"
27#include "citrusleaf/cf_byte_order.h"
28#include "citrusleaf/cf_clock.h"
29#include "citrusleaf/cf_digest.h"
30#include "citrusleaf/cf_queue.h"
31#include "base/cfg.h"
32#include "base/datamodel.h"
33#include "base/index.h"
34#include "base/predexp.h"
35#include "base/proto.h"
36#include "base/security.h"
37#include "base/service.h"
38#include "base/stats.h"
39#include "base/thr_tsvc.h"
40#include "base/transaction.h"
41#include "cf_mutex.h"
42#include "hardware.h"
43#include "socket.h"
44#include <errno.h>
45#include <pthread.h>
46#include <unistd.h>
47
48//---------------------------------------------------------
49// MACROS
50//---------------------------------------------------------
51
52#define BATCH_BLOCK_SIZE (1024 * 128) // 128K
53#define BATCH_REPEAT_SIZE 25 // index(4),digest(20) and repeat(1)
54
55#define BATCH_ABANDON_LIMIT (30UL * 1000 * 1000 * 1000) // 30 seconds
56
57#define BATCH_SUCCESS 0
58#define BATCH_ERROR -1
59#define BATCH_DELAY 1
60
61//---------------------------------------------------------
62// TYPES
63//---------------------------------------------------------
64
65// Pad batch input header to 30 bytes which is also the size of a transaction header.
66// This allows the input memory to be used as transaction cl_msg memory.
67// This saves a large number of memory allocations while allowing different
68// namespaces/bin name filters to be in the same batch.
69typedef struct {
70 uint32_t index;
71 cf_digest keyd;
72 uint8_t repeat;
73 uint8_t info1;
74 uint16_t n_fields;
75 uint16_t n_ops;
76} __attribute__((__packed__)) as_batch_input;
77
78typedef struct {
79 uint32_t capacity;
80 uint32_t size;
81 uint32_t tran_count;
82 cf_atomic32 writers;
83 as_proto proto;
84 uint8_t data[];
85} __attribute__((__packed__)) as_batch_buffer;
86
87struct as_batch_shared_s {
88 cf_mutex lock;
89 cf_queue* response_queue;
90 as_file_handle* fd_h;
91 cl_msg* msgp;
92 as_batch_buffer* buffer;
93 uint64_t start;
94 uint64_t end;
95 uint32_t tran_count_response;
96 uint32_t tran_count;
97 uint32_t tran_max;
98 uint32_t buffer_offset;
99 as_batch_buffer* delayed_buffer;
100 int result_code;
101 bool in_trailer;
102 bool bad_response_fd;
103 as_msg_field* predexp_mf;
104 predexp_eval_t* predexp;
105};
106
107typedef struct {
108 as_batch_shared* shared;
109 as_batch_buffer* buffer;
110} as_batch_response;
111
112typedef struct {
113 cf_queue* response_queue;
114 cf_queue* complete_queue;
115 cf_atomic32 tran_count;
116 uint32_t delay_count;
117 volatile bool active;
118} as_batch_queue;
119
120typedef struct {
121 as_batch_queue* batch_queue;
122 bool complete;
123} as_batch_work;
124
125//---------------------------------------------------------
126// STATIC DATA
127//---------------------------------------------------------
128
129static as_thread_pool batch_thread_pool;
130static as_buffer_pool batch_buffer_pool;
131
132static as_batch_queue batch_queues[MAX_BATCH_THREADS];
133static cf_mutex batch_resize_lock;
134
135//---------------------------------------------------------
136// STATIC FUNCTIONS
137//---------------------------------------------------------
138
139static int
140as_batch_send_error(as_transaction* btr, int result_code)
141{
142 // Send error back to client for batch transaction that failed before
143 // placing any sub-transactions on transaction queue.
144 cl_msg m;
145 m.proto.version = PROTO_VERSION;
146 m.proto.type = PROTO_TYPE_AS_MSG;
147 m.proto.sz = sizeof(as_msg);
148 as_proto_swap(&m.proto);
149 m.msg.header_sz = sizeof(as_msg);
150 m.msg.info1 = 0;
151 m.msg.info2 = 0;
152 m.msg.info3 = AS_MSG_INFO3_LAST;
153 m.msg.unused = 0;
154 m.msg.result_code = result_code;
155 m.msg.generation = 0;
156 m.msg.record_ttl = 0;
157 m.msg.transaction_ttl = 0;
158 m.msg.n_fields = 0;
159 m.msg.n_ops = 0;
160 as_msg_swap_header(&m.msg);
161
162 cf_socket* sock = &btr->from.proto_fd_h->sock;
163 int status = 0;
164
165 // Use blocking send because error occured before batch transaction was
166 // placed on batch queue.
167 if (cf_socket_send_all(sock, (uint8_t*)&m, sizeof(m), MSG_NOSIGNAL, CF_SOCKET_TIMEOUT) < 0) {
168 // Common when a client aborts.
169 cf_debug(AS_BATCH, "Batch send response error, errno %d fd %d", errno, CSFD(sock));
170 status = -1;
171 }
172
173 as_end_of_transaction(btr->from.proto_fd_h, status != 0);
174 btr->from.proto_fd_h = NULL;
175
176 cf_free(btr->msgp);
177 btr->msgp = 0;
178
179 if (result_code == AS_ERR_TIMEOUT) {
180 cf_atomic64_incr(&g_stats.batch_index_timeout);
181 }
182 else {
183 cf_atomic64_incr(&g_stats.batch_index_errors);
184 }
185 return status;
186}
187
188static int
189as_batch_send_buffer(as_batch_shared* shared, as_batch_buffer* buffer, int32_t flags)
190{
191 cf_socket* sock = &shared->fd_h->sock;
192 uint8_t* buf = (uint8_t*)&buffer->proto;
193 size_t size = sizeof(as_proto) + buffer->size;
194 size_t off = shared->buffer_offset;
195 size_t remaining = size - off;
196
197 ssize_t sent = cf_socket_try_send_all(sock, buf + off, remaining, flags);
198
199 if (sent < 0) {
200 shared->bad_response_fd = true;
201 return BATCH_ERROR;
202 }
203
204 if (sent < remaining) {
205 shared->buffer_offset += sent;
206 return BATCH_DELAY;
207 }
208
209 return BATCH_SUCCESS;
210}
211
212static inline void
213as_batch_release_buffer(as_batch_shared* shared, as_batch_buffer* buffer)
214{
215 if (as_buffer_pool_push_limit(&batch_buffer_pool, buffer, buffer->capacity,
216 g_config.batch_max_unused_buffers) != 0) {
217 // The push frees buffer on failure, so we just increment stat.
218 cf_atomic64_incr(&g_stats.batch_index_destroyed_buffers);
219 }
220}
221
222static void
223as_batch_complete(as_batch_queue* queue, as_batch_shared* shared, int status)
224{
225 as_end_of_transaction(shared->fd_h, status != 0);
226 shared->fd_h = NULL;
227
228 // For now the model is timeouts don't appear in histograms.
229 if (shared->result_code != AS_ERR_TIMEOUT) {
230 G_HIST_ACTIVATE_INSERT_DATA_POINT(batch_index_hist, shared->start);
231 }
232
233 // Check return code in order to update statistics.
234 if (status == 0 && shared->result_code == 0) {
235 cf_atomic64_incr(&g_stats.batch_index_complete);
236 }
237 else {
238 if (shared->result_code == AS_ERR_TIMEOUT) {
239 cf_atomic64_incr(&g_stats.batch_index_timeout);
240 }
241 else {
242 cf_atomic64_incr(&g_stats.batch_index_errors);
243 }
244 }
245
246 // Destroy lock
247 cf_mutex_destroy(&shared->lock);
248
249 // Release memory
250 predexp_destroy(shared->predexp);
251 cf_free(shared->msgp);
252 cf_free(shared);
253
254 // It's critical that this count is decremented after the transaction is
255 // completely finished with the queue because "shutdown threads" relies
256 // on this information when performing graceful shutdown.
257 cf_atomic32_decr(&queue->tran_count);
258}
259
260static bool
261as_batch_send_trailer(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer)
262{
263 // Use existing buffer to store trailer message.
264 as_proto* proto = &buffer->proto;
265 proto->version = PROTO_VERSION;
266 proto->type = PROTO_TYPE_AS_MSG;
267 proto->sz = sizeof(as_msg);
268 as_proto_swap(proto);
269
270 as_msg* msg = (as_msg*)buffer->data;
271 msg->header_sz = sizeof(as_msg);
272 msg->info1 = 0;
273 msg->info2 = 0;
274 msg->info3 = AS_MSG_INFO3_LAST;
275 msg->unused = 0;
276 msg->result_code = shared->result_code;
277 msg->generation = 0;
278 msg->record_ttl = 0;
279 msg->transaction_ttl = 0;
280 msg->n_fields = 0;
281 msg->n_ops = 0;
282 as_msg_swap_header(msg);
283
284 buffer->size = sizeof(as_msg);
285 shared->buffer_offset = 0;
286
287 int status = as_batch_send_buffer(shared, buffer, MSG_NOSIGNAL);
288
289 if (status == BATCH_DELAY) {
290 return false;
291 }
292
293 as_batch_release_buffer(shared, buffer);
294 as_batch_complete(queue, shared, status);
295 return true;
296}
297
298static inline bool
299as_batch_buffer_end(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer, int status)
300{
301 // If we're invoked for the trailer, we're done. Free buffer and batch.
302 if (shared->in_trailer) {
303 as_batch_release_buffer(shared, buffer);
304 as_batch_complete(queue, shared, status);
305 return true;
306 }
307
308 shared->tran_count_response += buffer->tran_count;
309
310 // We haven't yet reached the last buffer. Free buffer.
311 if (shared->tran_count_response < shared->tran_max) {
312 as_batch_release_buffer(shared, buffer);
313 return true;
314 }
315
316 // We've reached the last buffer. If we cannot send a trailer, then we're
317 // done. Free buffer and batch.
318 if (shared->bad_response_fd) {
319 as_batch_release_buffer(shared, buffer);
320 as_batch_complete(queue, shared, status);
321 return true;
322 }
323
324 // Reuse the last buffer for the trailer.
325 shared->in_trailer = true;
326 return as_batch_send_trailer(queue, shared, buffer);
327}
328
329static inline bool
330as_batch_abandon(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer)
331{
332 if (cf_getns() >= shared->end) {
333 cf_warning(AS_BATCH, "abandoned batch from %s with %u transactions after %lu ms",
334 shared->fd_h->client, shared->tran_max,
335 (shared->end - shared->start) / 1000000);
336 shared->bad_response_fd = true;
337 as_batch_buffer_end(queue, shared, buffer, BATCH_ERROR);
338 return true;
339 }
340
341 return false;
342}
343
344static bool
345as_batch_send_delayed(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer)
346{
347 // If we get here, other buffers in this batch can't have affected or
348 // reacted to this batch's status. All that happened is this buffer was
349 // delayed. Therefore, we don't need to check for error conditions.
350
351 int status = as_batch_send_buffer(shared, buffer, MSG_NOSIGNAL | MSG_MORE);
352
353 if (status == BATCH_DELAY) {
354 return false;
355 }
356
357 return as_batch_buffer_end(queue, shared, buffer, status);
358}
359
360static bool
361as_batch_send_response(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer)
362{
363 cf_assert(buffer->capacity != 0, AS_BATCH, "buffer capacity 0");
364
365 // Don't send buffer if an error has already occurred.
366 if (shared->bad_response_fd || shared->result_code) {
367 return as_batch_buffer_end(queue, shared, buffer, BATCH_ERROR);
368 }
369
370 shared->buffer_offset = 0;
371
372 // Send buffer block to client socket.
373 buffer->proto.version = PROTO_VERSION;
374 buffer->proto.type = PROTO_TYPE_AS_MSG;
375 buffer->proto.sz = buffer->size;
376 as_proto_swap(&buffer->proto);
377
378 int status = as_batch_send_buffer(shared, buffer, MSG_NOSIGNAL | MSG_MORE);
379
380 if (status == BATCH_DELAY) {
381 return false;
382 }
383
384 return as_batch_buffer_end(queue, shared, buffer, status);
385}
386
387static inline void
388as_batch_delay_buffer(as_batch_queue* queue)
389{
390 cf_atomic64_incr(&g_stats.batch_index_delay);
391
392 // If all batch transactions on this thread are delayed, avoid tight loop.
393 if (queue->tran_count == queue->delay_count) {
394 pthread_yield(); // not cf_thread_yield() - we're using as_thread_pool
395 }
396}
397
398static void
399as_batch_worker(void* udata)
400{
401 // Send batch data to client, one buffer block at a time.
402 as_batch_work* work = (as_batch_work*)udata;
403 as_batch_queue* batch_queue = work->batch_queue;
404 cf_queue* response_queue = batch_queue->response_queue;
405 as_batch_response response;
406 as_batch_shared* shared;
407 as_batch_buffer* buffer;
408
409 while (cf_queue_pop(response_queue, &response, CF_QUEUE_FOREVER) == CF_QUEUE_OK) {
410 // Check if this thread task should end.
411 shared = response.shared;
412 if (! shared) {
413 break;
414 }
415
416 buffer = response.buffer;
417
418 if (! shared->delayed_buffer) {
419 if (as_batch_send_response(batch_queue, shared, buffer)) {
420 continue;
421 }
422
423 if (as_batch_abandon(batch_queue, shared, buffer)) {
424 continue;
425 }
426
427 // Socket blocked.
428 shared->delayed_buffer = buffer;
429 batch_queue->delay_count++;
430 as_batch_delay_buffer(batch_queue);
431 }
432 else {
433 // Batch is delayed - try only original delayed buffer.
434 if (shared->delayed_buffer == buffer) {
435 shared->delayed_buffer = NULL;
436
437 if (as_batch_send_delayed(batch_queue, shared, buffer)) {
438 batch_queue->delay_count--;
439 continue;
440 }
441
442 if (as_batch_abandon(batch_queue, shared, buffer)) {
443 batch_queue->delay_count--;
444 continue;
445 }
446
447 // Socket blocked again.
448 shared->delayed_buffer = buffer;
449 as_batch_delay_buffer(batch_queue);
450 }
451 // else - delayed by another buffer in this batch, just re-queue.
452 }
453
454 cf_queue_push(response_queue, &response);
455 }
456
457 // Send back completion notification.
458 uint32_t complete = 1;
459 cf_queue_push(work->batch_queue->complete_queue, &complete);
460}
461
462static int
463as_batch_create_thread_queues(uint32_t begin, uint32_t end)
464{
465 // Allocate one queue per batch response worker thread.
466 int status = 0;
467
468 as_batch_work work;
469 work.complete = false;
470
471 for (uint32_t i = begin; i < end; i++) {
472 work.batch_queue = &batch_queues[i];
473 work.batch_queue->response_queue = cf_queue_create(sizeof(as_batch_response), true);
474 work.batch_queue->complete_queue = cf_queue_create(sizeof(uint32_t), true);
475 work.batch_queue->tran_count = 0;
476 work.batch_queue->delay_count = 0;
477 work.batch_queue->active = true;
478
479 int rc = as_thread_pool_queue_task_fixed(&batch_thread_pool, &work);
480
481 if (rc) {
482 cf_warning(AS_BATCH, "Failed to create batch thread %u: %d", i, rc);
483 status = rc;
484 }
485 }
486 return status;
487}
488
489static bool
490as_batch_wait(uint32_t begin, uint32_t end)
491{
492 for (uint32_t i = begin; i < end; i++) {
493 if (batch_queues[i].tran_count > 0) {
494 return false;
495 }
496 }
497 return true;
498}
499
500static int
501as_batch_shutdown_thread_queues(uint32_t begin, uint32_t end)
502{
503 // Set excess queues to inactive.
504 // Existing batch transactions will be allowed to complete.
505 for (uint32_t i = begin; i < end; i++) {
506 batch_queues[i].active = false;
507 }
508
509 // Wait till there are no more active batch transactions on the queues.
510 // Timeout after 30 seconds.
511 uint64_t limitus = cf_getus() + (1000 * 1000 * 30);
512 usleep(50 * 1000); // Sleep 50ms
513 do {
514 if (as_batch_wait(begin, end)) {
515 break;
516 }
517 usleep(500 * 1000); // Sleep 500ms
518
519 if (cf_getus() > limitus) {
520 cf_warning(AS_BATCH, "Batch shutdown threads failed on timeout. Transactions remain on queue.");
521 // Reactivate queues.
522 for (uint32_t i = begin; i < end; i++) {
523 batch_queues[i].active = true;
524 }
525 return -1;
526 }
527 } while (true);
528
529 // Send stop command to excess queues.
530 as_batch_response response;
531 memset(&response, 0, sizeof(as_batch_response));
532
533 for (uint32_t i = begin; i < end; i++) {
534 cf_queue_push(batch_queues[i].response_queue, &response);
535 }
536
537 // Wait for completion events.
538 uint32_t complete;
539 for (uint32_t i = begin; i < end; i++) {
540 as_batch_queue* bq = &batch_queues[i];
541 cf_queue_pop(bq->complete_queue, &complete, CF_QUEUE_FOREVER);
542 cf_queue_destroy(bq->complete_queue);
543 bq->complete_queue = 0;
544 cf_queue_destroy(bq->response_queue);
545 bq->response_queue = 0;
546 }
547 return 0;
548}
549
550static as_batch_queue*
551as_batch_find_queue(int queue_index)
552{
553 // Search backwards for an active queue.
554 for (int index = queue_index - 1; index >= 0; index--) {
555 as_batch_queue* bq = &batch_queues[index];
556
557 if (bq->active && cf_queue_sz(bq->response_queue) < g_config.batch_max_buffers_per_queue) {
558 return bq;
559 }
560 }
561
562 // Search forwards.
563 for (int index = queue_index + 1; index < MAX_BATCH_THREADS; index++) {
564 as_batch_queue* bq = &batch_queues[index];
565
566 // If current queue is not active, future queues will not be active either.
567 if (! bq->active) {
568 break;
569 }
570
571 if (cf_queue_sz(bq->response_queue) < g_config.batch_max_buffers_per_queue) {
572 return bq;
573 }
574 }
575 return 0;
576}
577
578static as_batch_buffer*
579as_batch_buffer_create(uint32_t size)
580{
581 as_batch_buffer* buffer = cf_malloc(size);
582 buffer->capacity = size - batch_buffer_pool.header_size;
583 cf_atomic64_incr(&g_stats.batch_index_created_buffers);
584 return buffer;
585}
586
587static uint8_t*
588as_batch_buffer_pop(as_batch_shared* shared, uint32_t size)
589{
590 as_batch_buffer* buffer;
591 uint32_t mem_size = size + batch_buffer_pool.header_size;
592
593 if (mem_size > batch_buffer_pool.buffer_size) {
594 // Requested size is greater than fixed buffer size.
595 // Allocate new buffer, but don't put back into pool.
596 buffer = as_batch_buffer_create(mem_size);
597 cf_atomic64_incr(&g_stats.batch_index_huge_buffers);
598 }
599 else {
600 // Pop existing buffer from queue.
601 // The extra lock here is unavoidable.
602 int status = cf_queue_pop(batch_buffer_pool.queue, &buffer, CF_QUEUE_NOWAIT);
603
604 if (status == CF_QUEUE_OK) {
605 buffer->capacity = batch_buffer_pool.buffer_size - batch_buffer_pool.header_size;
606 }
607 else if (status == CF_QUEUE_EMPTY) {
608 // Queue is empty. Create new buffer.
609 buffer = as_batch_buffer_create(batch_buffer_pool.buffer_size);
610 }
611 else {
612 cf_crash(AS_BATCH, "Failed to pop new batch buffer: %d", status);
613 }
614 }
615
616 // Reserve a slot in new buffer.
617 buffer->size = size;
618 buffer->tran_count = 1;
619 buffer->writers = 2;
620 shared->buffer = buffer;
621 return buffer->data;
622}
623
624static inline void
625as_batch_buffer_complete(as_batch_shared* shared, as_batch_buffer* buffer)
626{
627 // Flush when all writers have finished writing into the buffer.
628 if (cf_atomic32_decr(&buffer->writers) == 0) {
629 as_batch_response response = {.shared = shared, .buffer = buffer};
630 cf_queue_push(shared->response_queue, &response);
631 }
632}
633
634static uint8_t*
635as_batch_reserve(as_batch_shared* shared, uint32_t size, int result_code, as_batch_buffer** buffer_out, bool* complete)
636{
637 as_batch_buffer* buffer;
638 uint8_t* data;
639
640 cf_mutex_lock(&shared->lock);
641 *complete = (++shared->tran_count == shared->tran_max);
642 buffer = shared->buffer;
643
644 if (! buffer) {
645 // No previous buffer. Get new buffer.
646 data = as_batch_buffer_pop(shared, size);
647 *buffer_out = shared->buffer;
648 cf_mutex_unlock(&shared->lock);
649 }
650 else if (buffer->size + size <= buffer->capacity) {
651 // Result fits into existing block. Reserve a slot.
652 data = buffer->data + buffer->size;
653 buffer->size += size;
654 buffer->tran_count++;
655 cf_atomic32_incr(&buffer->writers);
656 *buffer_out = buffer;
657 cf_mutex_unlock(&shared->lock);
658 }
659 else {
660 // Result does not fit into existing block.
661 // Make copy of existing buffer.
662 as_batch_buffer* prev_buffer = buffer;
663
664 // Get new buffer.
665 data = as_batch_buffer_pop(shared, size);
666 *buffer_out = shared->buffer;
667 cf_mutex_unlock(&shared->lock);
668
669 as_batch_buffer_complete(shared, prev_buffer);
670 }
671
672 if (! (result_code == AS_OK || result_code == AS_ERR_NOT_FOUND ||
673 result_code == AS_ERR_FILTERED_OUT)) {
674 // Result code can be set outside of lock because it doesn't matter which transaction's
675 // result code is used as long as it's an error.
676 shared->result_code = result_code;
677 }
678 return data;
679}
680
681static inline void
682as_batch_transaction_end(as_batch_shared* shared, as_batch_buffer* buffer, bool complete)
683{
684 // This flush can only be triggered when the buffer is full.
685 as_batch_buffer_complete(shared, buffer);
686
687 if (complete) {
688 // This flush only occurs when all transactions in batch have been processed.
689 as_batch_buffer_complete(shared, buffer);
690 }
691}
692
693static void
694as_batch_terminate(as_batch_shared* shared, uint32_t tran_count, int result_code)
695{
696 // Terminate batch by adding phantom transactions to shared and buffer tran counts.
697 // This is done so the memory is released at the end only once.
698 as_batch_buffer* buffer;
699 bool complete;
700
701 cf_mutex_lock(&shared->lock);
702 buffer = shared->buffer;
703 shared->result_code = result_code;
704 shared->tran_count += tran_count;
705 complete = (shared->tran_count == shared->tran_max);
706
707 if (! buffer) {
708 // No previous buffer. Get new buffer.
709 as_batch_buffer_pop(shared, 0);
710 buffer = shared->buffer;
711 buffer->tran_count = tran_count; // Override tran_count.
712 }
713 else {
714 // Buffer exists. Add phantom transactions.
715 buffer->tran_count += tran_count;
716 cf_atomic32_incr(&buffer->writers);
717 }
718 cf_mutex_unlock(&shared->lock);
719 as_batch_transaction_end(shared, buffer, complete);
720}
721
722//---------------------------------------------------------
723// FUNCTIONS
724//---------------------------------------------------------
725
726int
727as_batch_init()
728{
729 cf_mutex_init(&batch_resize_lock);
730
731 // Default 'batch-index-threads' can't be set before call to cf_topo_init().
732 if (g_config.n_batch_index_threads == 0) {
733 g_config.n_batch_index_threads = cf_topo_count_cpus();
734 }
735
736 cf_info(AS_BATCH, "starting %u batch-index-threads", g_config.n_batch_index_threads);
737
738 int rc = as_thread_pool_init_fixed(&batch_thread_pool, g_config.n_batch_index_threads, as_batch_worker,
739 sizeof(as_batch_work), offsetof(as_batch_work,complete));
740
741 if (rc) {
742 cf_warning(AS_BATCH, "Failed to initialize batch-index-threads to %u: %d", g_config.n_batch_index_threads, rc);
743 return rc;
744 }
745
746 rc = as_buffer_pool_init(&batch_buffer_pool, sizeof(as_batch_buffer), BATCH_BLOCK_SIZE);
747
748 if (rc) {
749 cf_warning(AS_BATCH, "Failed to initialize batch buffer pool: %d", rc);
750 return rc;
751 }
752
753 rc = as_batch_create_thread_queues(0, g_config.n_batch_index_threads);
754
755 if (rc) {
756 return rc;
757 }
758
759 return 0;
760}
761
762int
763as_batch_queue_task(as_transaction* btr)
764{
765 uint64_t counter = cf_atomic64_incr(&g_stats.batch_index_initiate);
766 uint32_t thread_size = batch_thread_pool.thread_size;
767
768 if (thread_size == 0 || thread_size > MAX_BATCH_THREADS) {
769 cf_warning(AS_BATCH, "batch-index-threads has been disabled: %d", thread_size);
770 return as_batch_send_error(btr, AS_ERR_BATCH_DISABLED);
771 }
772 uint32_t queue_index = counter % thread_size;
773
774 // Validate batch transaction
775 as_proto* bproto = &btr->msgp->proto;
776
777 if (bproto->sz > PROTO_SIZE_MAX) {
778 cf_warning(AS_BATCH, "can't process message: invalid size %lu should be %d or less",
779 (uint64_t)bproto->sz, PROTO_SIZE_MAX);
780 return as_batch_send_error(btr, AS_ERR_PARAMETER);
781 }
782
783 if (bproto->type != PROTO_TYPE_AS_MSG) {
784 cf_warning(AS_BATCH, "Invalid proto type. Expected %d Received %d", PROTO_TYPE_AS_MSG, bproto->type);
785 return as_batch_send_error(btr, AS_ERR_PARAMETER);
786 }
787
788 // Check that the socket is authenticated.
789 uint8_t result = as_security_check(btr->from.proto_fd_h, PERM_NONE);
790
791 if (result != AS_OK) {
792 as_security_log(btr->from.proto_fd_h, result, PERM_NONE, NULL, NULL);
793 return as_batch_send_error(btr, result);
794 }
795
796 // Parse header
797 as_msg* bmsg = &btr->msgp->msg;
798 as_msg_swap_header(bmsg);
799
800 // Parse fields
801 uint8_t* limit = (uint8_t*)bmsg + bproto->sz;
802 as_msg_field* mf = (as_msg_field*)bmsg->data;
803 as_msg_field* end;
804 as_msg_field* bf = 0;
805 as_msg_field* predexp_mf = 0;
806
807 for (int i = 0; i < bmsg->n_fields; i++) {
808 if ((uint8_t*)mf >= limit) {
809 cf_warning(AS_BATCH, "Batch field limit reached");
810 return as_batch_send_error(btr, AS_ERR_PARAMETER);
811 }
812 as_msg_swap_field(mf);
813 end = as_msg_field_get_next(mf);
814
815 if (mf->type == AS_MSG_FIELD_TYPE_BATCH || mf->type == AS_MSG_FIELD_TYPE_BATCH_WITH_SET) {
816 bf = mf;
817 }
818 else if (mf->type == AS_MSG_FIELD_TYPE_PREDEXP) {
819 predexp_mf = mf;
820 }
821
822 mf = end;
823 }
824
825 if (! bf) {
826 cf_warning(AS_BATCH, "Batch index field not found");
827 return as_batch_send_error(btr, AS_ERR_PARAMETER);
828 }
829
830 // Parse batch field
831 uint8_t* data = bf->data;
832 uint32_t tran_count = cf_swap_from_be32(*(uint32_t*)data);
833 data += sizeof(uint32_t);
834
835 if (tran_count == 0) {
836 cf_warning(AS_BATCH, "Batch request size is zero");
837 return as_batch_send_error(btr, AS_ERR_PARAMETER);
838 }
839
840 if (tran_count > g_config.batch_max_requests) {
841 cf_warning(AS_BATCH, "Batch request size %u exceeds max %u", tran_count, g_config.batch_max_requests);
842 return as_batch_send_error(btr, AS_ERR_BATCH_MAX_REQUESTS);
843 }
844
845 // Initialize shared data
846 as_batch_shared* shared = cf_malloc(sizeof(as_batch_shared));
847
848 memset(shared, 0, sizeof(as_batch_shared));
849
850 cf_mutex_init(&shared->lock);
851
852 // Abandon batch at twice batch timeout if batch timeout is defined.
853 // If batch timeout is zero, abandon after 30 seconds.
854 shared->end = btr->start_time + ((bmsg->transaction_ttl != 0) ?
855 ((uint64_t)bmsg->transaction_ttl * 1000 * 1000 * 2) :
856 BATCH_ABANDON_LIMIT);
857
858 shared->start = btr->start_time;
859 shared->fd_h = btr->from.proto_fd_h;
860 shared->msgp = btr->msgp;
861 shared->tran_max = tran_count;
862
863 // Find batch queue to send transaction responses.
864 as_batch_queue* batch_queue = &batch_queues[queue_index];
865
866 // batch_max_buffers_per_queue is a soft limit, but still must be checked under lock.
867 if (! (batch_queue->active && cf_queue_sz(batch_queue->response_queue) < g_config.batch_max_buffers_per_queue)) {
868 // Queue buffer limit has been exceeded or thread has been shutdown (probably due to
869 // downwards thread resize). Search for an available queue.
870 // cf_warning(AS_BATCH, "Queue %u full %d", queue_index, cf_queue_sz(batch_queue->response_queue));
871 batch_queue = as_batch_find_queue(queue_index);
872
873 if (! batch_queue) {
874 cf_warning(AS_BATCH, "Failed to find active batch queue that is not full");
875 cf_free(shared);
876 return as_batch_send_error(btr, AS_ERR_BATCH_QUEUES_FULL);
877 }
878 }
879
880 if (predexp_mf != NULL) {
881 shared->predexp_mf = predexp_mf;
882
883 if ((shared->predexp = predexp_build(predexp_mf)) == NULL) {
884 cf_warning(AS_BATCH, "Failed to build batch predexp");
885 cf_free(shared);
886 return as_batch_send_error(btr, AS_ERR_PARAMETER);
887 }
888 }
889
890 // Increment batch queue transaction count.
891 cf_atomic32_incr(&batch_queue->tran_count);
892 shared->response_queue = batch_queue->response_queue;
893
894 // Initialize generic transaction.
895 as_transaction tr;
896 as_transaction_init_head(&tr, 0, 0);
897
898 tr.origin = FROM_BATCH;
899 tr.from_flags |= FROM_FLAG_BATCH_SUB;
900 tr.start_time = btr->start_time;
901
902 // Read batch keys and initialize generic transactions.
903 as_batch_input* in;
904 cl_msg* out = NULL;
905 cl_msg* prev_msgp = NULL;
906 as_msg_op* op;
907 uint32_t tran_row = 0;
908 uint8_t info = *data++; // allow transaction inline.
909
910 bool allow_inline = (g_config.n_namespaces_inlined != 0 && info);
911 bool check_inline = (allow_inline && g_config.n_namespaces_not_inlined != 0);
912 bool should_inline = (allow_inline && g_config.n_namespaces_not_inlined == 0);
913
914 // Split batch rows into separate single record read transactions.
915 // The read transactions are located in the same memory block as
916 // the original batch transactions. This allows us to avoid performing
917 // an extra malloc for each transaction.
918 while (tran_row < tran_count && data + BATCH_REPEAT_SIZE <= limit) {
919 // Copy transaction data before memory gets overwritten.
920 in = (as_batch_input*)data;
921
922 tr.from.batch_shared = shared; // is set NULL after sub-transaction
923 tr.from_data.batch_index = cf_swap_from_be32(in->index);
924 tr.keyd = in->keyd;
925 tr.benchmark_time = btr->benchmark_time; // must reset for each usage
926
927 if (in->repeat) {
928 if (! prev_msgp) {
929 break; // bad bytes from client - repeat set on first item
930 }
931
932 // Row should use previous namespace and bin names.
933 data += BATCH_REPEAT_SIZE;
934 tr.msgp = prev_msgp;
935 }
936 else {
937 tr.msg_fields = 0; // erase previous AS_MSG_FIELD_BIT_SET flag, if any
938 as_transaction_set_msg_field_flag(&tr, AS_MSG_FIELD_TYPE_NAMESPACE);
939
940 // Row contains full namespace/bin names.
941 out = (cl_msg*)data;
942
943 if (data + sizeof(cl_msg) + sizeof(as_msg_field) > limit) {
944 break;
945 }
946
947 out->msg.header_sz = sizeof(as_msg);
948 out->msg.info1 = in->info1;
949 out->msg.info2 = 0;
950 out->msg.info3 = bmsg->info3 &
951 (AS_MSG_INFO3_SC_READ_RELAX | AS_MSG_INFO3_SC_READ_TYPE);
952 out->msg.unused = 0;
953 out->msg.result_code = 0;
954 out->msg.generation = 0;
955 out->msg.record_ttl = 0;
956 out->msg.transaction_ttl = bmsg->transaction_ttl; // already swapped
957 // n_fields/n_ops is in exact same place on both input/output, but the value still
958 // needs to be swapped.
959 out->msg.n_fields = cf_swap_from_be16(in->n_fields);
960
961 // Older clients sent zero, but always sent namespace. Adjust this.
962 if (out->msg.n_fields == 0) {
963 out->msg.n_fields = 1;
964 }
965
966 out->msg.n_ops = cf_swap_from_be16(in->n_ops);
967
968 // Namespace input is same as namespace field, so just leave in place and swap.
969 data += sizeof(cl_msg);
970 mf = (as_msg_field*)data;
971 as_msg_swap_field(mf);
972 if (check_inline) {
973 as_namespace* ns = as_namespace_get_bymsgfield(mf);
974 should_inline = ns && ns->storage_data_in_memory;
975 }
976 mf = as_msg_field_get_next(mf);
977 data = (uint8_t*)mf;
978
979 // Swap remaining fields.
980 for (uint16_t j = 1; j < out->msg.n_fields; j++) {
981 if (data + sizeof(as_msg_field) > limit) {
982 goto TranEnd;
983 }
984
985 if (mf->type == AS_MSG_FIELD_TYPE_SET) {
986 as_transaction_set_msg_field_flag(&tr, AS_MSG_FIELD_TYPE_SET);
987 }
988
989 as_msg_swap_field(mf);
990 mf = as_msg_field_get_next(mf);
991 data = (uint8_t*)mf;
992 }
993
994 if (out->msg.n_ops) {
995 // Bin names input is same as transaction ops, so just leave in place and swap.
996 uint16_t n_ops = out->msg.n_ops;
997 for (uint16_t j = 0; j < n_ops; j++) {
998 if (data + sizeof(as_msg_op) > limit) {
999 goto TranEnd;
1000 }
1001 op = (as_msg_op*)data;
1002 as_msg_swap_op(op);
1003 op = as_msg_op_get_next(op);
1004 data = (uint8_t*)op;
1005 }
1006 }
1007
1008 // Initialize msg header.
1009 out->proto.version = PROTO_VERSION;
1010 out->proto.type = PROTO_TYPE_AS_MSG;
1011 out->proto.sz = (data - (uint8_t*)&out->msg);
1012 tr.msgp = out;
1013 prev_msgp = out;
1014 }
1015
1016 if (data > limit) {
1017 break;
1018 }
1019
1020 // Submit transaction.
1021 if (should_inline) {
1022 as_tsvc_process_transaction(&tr);
1023 }
1024 else {
1025 // Queue transaction to be processed by a transaction thread.
1026 as_service_enqueue_internal(&tr);
1027 }
1028 tran_row++;
1029 }
1030
1031TranEnd:
1032 if (tran_row < tran_count) {
1033 // Mismatch between tran_count and actual data. Terminate transaction.
1034 cf_warning(AS_BATCH, "Batch keys mismatch. Expected %u Received %u", tran_count, tran_row);
1035 as_batch_terminate(shared, tran_count - tran_row, AS_ERR_PARAMETER);
1036 }
1037
1038 // Reset original socket because socket now owned by batch shared.
1039 btr->from.proto_fd_h = NULL;
1040 return 0;
1041}
1042
1043void
1044as_batch_add_result(as_transaction* tr, uint16_t n_bins, as_bin** bins,
1045 as_msg_op** ops)
1046{
1047 as_namespace* ns = tr->rsv.ns;
1048
1049 // Calculate size.
1050 size_t size = sizeof(as_msg);
1051 size += sizeof(as_msg_field) + sizeof(cf_digest);
1052
1053 uint16_t n_fields = 1;
1054
1055 for (uint16_t i = 0; i < n_bins; i++) {
1056 as_bin* bin = bins[i];
1057 size += sizeof(as_msg_op);
1058
1059 if (ops) {
1060 size += ops[i]->name_sz;
1061 }
1062 else if (bin) {
1063 size += ns->single_bin ? 0 : strlen(as_bin_get_name_from_id(ns, bin->id));
1064 }
1065 else {
1066 cf_crash(AS_BATCH, "making response message with null bin and op");
1067 }
1068
1069 if (bin) {
1070 size += as_bin_particle_client_value_size(bin);
1071 }
1072 }
1073
1074 as_batch_shared* shared = tr->from.batch_shared;
1075
1076 as_batch_buffer* buffer;
1077 bool complete;
1078 uint8_t* data = as_batch_reserve(shared, size, tr->result_code, &buffer, &complete);
1079
1080 if (data) {
1081 // Write header.
1082 uint8_t* p = data;
1083 as_msg* m = (as_msg*)p;
1084 m->header_sz = sizeof(as_msg);
1085 m->info1 = 0;
1086 m->info2 = 0;
1087 m->info3 = 0;
1088 m->unused = 0;
1089 m->result_code = tr->result_code;
1090 m->generation = plain_generation(tr->generation, ns);
1091 m->record_ttl = tr->void_time;
1092
1093 // Overload transaction_ttl to store batch index.
1094 m->transaction_ttl = tr->from_data.batch_index;
1095
1096 m->n_fields = n_fields;
1097 m->n_ops = n_bins;
1098 as_msg_swap_header(m);
1099 p += sizeof(as_msg);
1100
1101 as_msg_field* field = (as_msg_field*)p;
1102 field->field_sz = sizeof(cf_digest) + 1;
1103 field->type = AS_MSG_FIELD_TYPE_DIGEST_RIPE;
1104 memcpy(field->data, &tr->keyd, sizeof(cf_digest));
1105 as_msg_swap_field(field);
1106 p += sizeof(as_msg_field) + sizeof(cf_digest);
1107
1108 for (uint16_t i = 0; i < n_bins; i++) {
1109 as_bin* bin = bins[i];
1110 as_msg_op* op = (as_msg_op*)p;
1111 op->op = AS_MSG_OP_READ;
1112 op->version = 0;
1113
1114 if (ops) {
1115 as_msg_op* src = ops[i];
1116 memcpy(op->name, src->name, src->name_sz);
1117 op->name_sz = src->name_sz;
1118 }
1119 else {
1120 op->name_sz = as_bin_memcpy_name(ns, op->name, bin);
1121 }
1122
1123 op->op_sz = OP_FIXED_SZ + op->name_sz;
1124 p += sizeof(as_msg_op) + op->name_sz;
1125 p += as_bin_particle_to_client(bin, op);
1126 as_msg_swap_op(op);
1127 }
1128 }
1129 as_batch_transaction_end(shared, buffer, complete);
1130}
1131
1132void
1133as_batch_add_proxy_result(as_batch_shared* shared, uint32_t index, cf_digest* digest, cl_msg* cmsg, size_t proxy_size)
1134{
1135 as_msg* msg = &cmsg->msg;
1136 size_t size = proxy_size + sizeof(as_msg_field) + sizeof(cf_digest) - sizeof(as_proto);
1137
1138 as_batch_buffer* buffer;
1139 bool complete;
1140 uint8_t* data = as_batch_reserve(shared, size, msg->result_code, &buffer, &complete);
1141
1142 if (data) {
1143 // Overload transaction_ttl to store batch index.
1144 msg->transaction_ttl = htonl(index);
1145
1146 // Write header
1147 uint16_t n_fields = ntohs(msg->n_fields);
1148 msg->n_fields = htons(n_fields + 1);
1149 memcpy(data, msg, sizeof(as_msg));
1150 uint8_t* trg = data + sizeof(as_msg);
1151
1152 // Write digest field
1153 as_msg_field* field = (as_msg_field*)trg;
1154 field->field_sz = sizeof(cf_digest) + 1;
1155 field->type = AS_MSG_FIELD_TYPE_DIGEST_RIPE;
1156 memcpy(field->data, digest, sizeof(cf_digest));
1157 as_msg_swap_field(field);
1158 trg += sizeof(as_msg_field) + sizeof(cf_digest);
1159
1160 // Copy others fields and ops.
1161 size = ((uint8_t*)cmsg + proxy_size) - msg->data;
1162 memcpy(trg, msg->data, size);
1163 }
1164 as_batch_transaction_end(shared, buffer, complete);
1165}
1166
1167void
1168as_batch_add_error(as_batch_shared* shared, uint32_t index, int result_code)
1169{
1170 as_batch_buffer* buffer;
1171 bool complete;
1172 uint8_t* data = as_batch_reserve(shared, sizeof(as_msg), result_code, &buffer, &complete);
1173
1174 if (data) {
1175 // Write error.
1176 as_msg* m = (as_msg*)data;
1177 m->header_sz = sizeof(as_msg);
1178 m->info1 = 0;
1179 m->info2 = 0;
1180 m->info3 = 0;
1181 m->unused = 0;
1182 m->result_code = result_code;
1183 m->generation = 0;
1184 m->record_ttl = 0;
1185 // Overload transaction_ttl to store batch index.
1186 m->transaction_ttl = index;
1187 m->n_fields = 0;
1188 m->n_ops = 0;
1189 as_msg_swap_header(m);
1190 }
1191 as_batch_transaction_end(shared, buffer, complete);
1192}
1193
1194int
1195as_batch_threads_resize(uint32_t threads)
1196{
1197 if (threads > MAX_BATCH_THREADS) {
1198 cf_warning(AS_BATCH, "batch-index-threads %u exceeds max %u", threads, MAX_BATCH_THREADS);
1199 return -1;
1200 }
1201
1202 cf_mutex_lock(&batch_resize_lock);
1203
1204 // Resize thread pool. The threads will wait for graceful shutdown on downwards resize.
1205 uint32_t threads_orig = batch_thread_pool.thread_size;
1206 cf_info(AS_BATCH, "Resize batch-index-threads from %u to %u", threads_orig, threads);
1207 int status = 0;
1208
1209 if (threads != threads_orig) {
1210 if (threads > threads_orig) {
1211 // Increase threads before initializing queues.
1212 status = as_thread_pool_resize(&batch_thread_pool, threads);
1213
1214 if (status == 0) {
1215 g_config.n_batch_index_threads = threads;
1216 // Adjust queues to match new thread size.
1217 status = as_batch_create_thread_queues(threads_orig, threads);
1218 }
1219 else {
1220 // Show warning, but keep going as some threads may have been successfully added/removed.
1221 cf_warning(AS_BATCH, "Failed to resize batch-index-threads. status=%d, batch-index-threads=%u",
1222 status, g_config.n_batch_index_threads);
1223 threads = batch_thread_pool.thread_size;
1224
1225 if (threads > threads_orig) {
1226 g_config.n_batch_index_threads = threads;
1227 // Adjust queues to match new thread size.
1228 status = as_batch_create_thread_queues(threads_orig, threads);
1229 }
1230 }
1231 }
1232 else {
1233 // Shutdown queues before shutting down threads.
1234 status = as_batch_shutdown_thread_queues(threads, threads_orig);
1235
1236 if (status == 0) {
1237 // Adjust threads to match new queue size.
1238 status = as_thread_pool_resize(&batch_thread_pool, threads);
1239 g_config.n_batch_index_threads = batch_thread_pool.thread_size;
1240
1241 if (status) {
1242 cf_warning(AS_BATCH, "Failed to resize batch-index-threads. status=%d, batch-index-threads=%u",
1243 status, g_config.n_batch_index_threads);
1244 }
1245 }
1246 }
1247 }
1248 cf_mutex_unlock(&batch_resize_lock);
1249 return status;
1250}
1251
1252void
1253as_batch_queues_info(cf_dyn_buf* db)
1254{
1255 cf_mutex_lock(&batch_resize_lock);
1256
1257 uint32_t max = batch_thread_pool.thread_size;
1258
1259 for (uint32_t i = 0; i < max; i++) {
1260 if (i > 0) {
1261 cf_dyn_buf_append_char(db, ',');
1262 }
1263 as_batch_queue* bq = &batch_queues[i];
1264 cf_dyn_buf_append_uint32(db, bq->tran_count); // Batch count
1265 cf_dyn_buf_append_char(db, ':');
1266 cf_dyn_buf_append_int(db, cf_queue_sz(bq->response_queue)); // Buffer count
1267 }
1268 cf_mutex_unlock(&batch_resize_lock);
1269}
1270
1271int
1272as_batch_unused_buffers()
1273{
1274 return cf_queue_sz(batch_buffer_pool.queue);
1275}
1276
1277// Not currently called. Put in this place holder in case server decides to
1278// implement clean shutdowns in the future.
1279void
1280as_batch_destroy()
1281{
1282 as_thread_pool_destroy(&batch_thread_pool);
1283 as_buffer_pool_destroy(&batch_buffer_pool);
1284
1285 cf_mutex_lock(&batch_resize_lock);
1286 as_batch_shutdown_thread_queues(0, batch_thread_pool.thread_size);
1287 cf_mutex_unlock(&batch_resize_lock);
1288 cf_mutex_destroy(&batch_resize_lock);
1289}
1290
1291as_file_handle*
1292as_batch_get_fd_h(as_batch_shared* shared)
1293{
1294 return shared->fd_h;
1295}
1296
1297as_msg_field*
1298as_batch_get_predexp_mf(as_batch_shared* shared)
1299{
1300 return shared->predexp_mf;
1301}
1302
1303predexp_eval_t*
1304as_batch_get_predexp(as_batch_shared* shared)
1305{
1306 return shared->predexp;
1307}
1308