1 | /* |
2 | * batch.c |
3 | * |
4 | * Copyright (C) 2012-2015 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | #include "base/batch.h" |
23 | #include "aerospike/as_buffer_pool.h" |
24 | #include "aerospike/as_thread_pool.h" |
25 | #include "citrusleaf/alloc.h" |
26 | #include "citrusleaf/cf_atomic.h" |
27 | #include "citrusleaf/cf_byte_order.h" |
28 | #include "citrusleaf/cf_clock.h" |
29 | #include "citrusleaf/cf_digest.h" |
30 | #include "citrusleaf/cf_queue.h" |
31 | #include "base/cfg.h" |
32 | #include "base/datamodel.h" |
33 | #include "base/index.h" |
34 | #include "base/predexp.h" |
35 | #include "base/proto.h" |
36 | #include "base/security.h" |
37 | #include "base/service.h" |
38 | #include "base/stats.h" |
39 | #include "base/thr_tsvc.h" |
40 | #include "base/transaction.h" |
41 | #include "cf_mutex.h" |
42 | #include "hardware.h" |
43 | #include "socket.h" |
44 | #include <errno.h> |
45 | #include <pthread.h> |
46 | #include <unistd.h> |
47 | |
48 | //--------------------------------------------------------- |
49 | // MACROS |
50 | //--------------------------------------------------------- |
51 | |
52 | #define BATCH_BLOCK_SIZE (1024 * 128) // 128K |
53 | #define BATCH_REPEAT_SIZE 25 // index(4),digest(20) and repeat(1) |
54 | |
55 | #define BATCH_ABANDON_LIMIT (30UL * 1000 * 1000 * 1000) // 30 seconds |
56 | |
57 | #define BATCH_SUCCESS 0 |
58 | #define BATCH_ERROR -1 |
59 | #define BATCH_DELAY 1 |
60 | |
61 | //--------------------------------------------------------- |
62 | // TYPES |
63 | //--------------------------------------------------------- |
64 | |
65 | // Pad batch input header to 30 bytes which is also the size of a transaction header. |
66 | // This allows the input memory to be used as transaction cl_msg memory. |
67 | // This saves a large number of memory allocations while allowing different |
68 | // namespaces/bin name filters to be in the same batch. |
69 | typedef struct { |
70 | uint32_t index; |
71 | cf_digest keyd; |
72 | uint8_t repeat; |
73 | uint8_t info1; |
74 | uint16_t n_fields; |
75 | uint16_t n_ops; |
76 | } __attribute__((__packed__)) as_batch_input; |
77 | |
78 | typedef struct { |
79 | uint32_t capacity; |
80 | uint32_t size; |
81 | uint32_t tran_count; |
82 | cf_atomic32 writers; |
83 | as_proto proto; |
84 | uint8_t data[]; |
85 | } __attribute__((__packed__)) as_batch_buffer; |
86 | |
87 | struct as_batch_shared_s { |
88 | cf_mutex lock; |
89 | cf_queue* response_queue; |
90 | as_file_handle* fd_h; |
91 | cl_msg* msgp; |
92 | as_batch_buffer* buffer; |
93 | uint64_t start; |
94 | uint64_t end; |
95 | uint32_t tran_count_response; |
96 | uint32_t tran_count; |
97 | uint32_t tran_max; |
98 | uint32_t buffer_offset; |
99 | as_batch_buffer* delayed_buffer; |
100 | int result_code; |
101 | bool in_trailer; |
102 | bool bad_response_fd; |
103 | as_msg_field* predexp_mf; |
104 | predexp_eval_t* predexp; |
105 | }; |
106 | |
107 | typedef struct { |
108 | as_batch_shared* shared; |
109 | as_batch_buffer* buffer; |
110 | } as_batch_response; |
111 | |
112 | typedef struct { |
113 | cf_queue* response_queue; |
114 | cf_queue* complete_queue; |
115 | cf_atomic32 tran_count; |
116 | uint32_t delay_count; |
117 | volatile bool active; |
118 | } as_batch_queue; |
119 | |
120 | typedef struct { |
121 | as_batch_queue* batch_queue; |
122 | bool complete; |
123 | } as_batch_work; |
124 | |
125 | //--------------------------------------------------------- |
126 | // STATIC DATA |
127 | //--------------------------------------------------------- |
128 | |
129 | static as_thread_pool batch_thread_pool; |
130 | static as_buffer_pool batch_buffer_pool; |
131 | |
132 | static as_batch_queue batch_queues[MAX_BATCH_THREADS]; |
133 | static cf_mutex batch_resize_lock; |
134 | |
135 | //--------------------------------------------------------- |
136 | // STATIC FUNCTIONS |
137 | //--------------------------------------------------------- |
138 | |
139 | static int |
140 | as_batch_send_error(as_transaction* btr, int result_code) |
141 | { |
142 | // Send error back to client for batch transaction that failed before |
143 | // placing any sub-transactions on transaction queue. |
144 | cl_msg m; |
145 | m.proto.version = PROTO_VERSION; |
146 | m.proto.type = PROTO_TYPE_AS_MSG; |
147 | m.proto.sz = sizeof(as_msg); |
148 | as_proto_swap(&m.proto); |
149 | m.msg.header_sz = sizeof(as_msg); |
150 | m.msg.info1 = 0; |
151 | m.msg.info2 = 0; |
152 | m.msg.info3 = AS_MSG_INFO3_LAST; |
153 | m.msg.unused = 0; |
154 | m.msg.result_code = result_code; |
155 | m.msg.generation = 0; |
156 | m.msg.record_ttl = 0; |
157 | m.msg.transaction_ttl = 0; |
158 | m.msg.n_fields = 0; |
159 | m.msg.n_ops = 0; |
160 | as_msg_swap_header(&m.msg); |
161 | |
162 | cf_socket* sock = &btr->from.proto_fd_h->sock; |
163 | int status = 0; |
164 | |
165 | // Use blocking send because error occured before batch transaction was |
166 | // placed on batch queue. |
167 | if (cf_socket_send_all(sock, (uint8_t*)&m, sizeof(m), MSG_NOSIGNAL, CF_SOCKET_TIMEOUT) < 0) { |
168 | // Common when a client aborts. |
169 | cf_debug(AS_BATCH, "Batch send response error, errno %d fd %d" , errno, CSFD(sock)); |
170 | status = -1; |
171 | } |
172 | |
173 | as_end_of_transaction(btr->from.proto_fd_h, status != 0); |
174 | btr->from.proto_fd_h = NULL; |
175 | |
176 | cf_free(btr->msgp); |
177 | btr->msgp = 0; |
178 | |
179 | if (result_code == AS_ERR_TIMEOUT) { |
180 | cf_atomic64_incr(&g_stats.batch_index_timeout); |
181 | } |
182 | else { |
183 | cf_atomic64_incr(&g_stats.batch_index_errors); |
184 | } |
185 | return status; |
186 | } |
187 | |
188 | static int |
189 | as_batch_send_buffer(as_batch_shared* shared, as_batch_buffer* buffer, int32_t flags) |
190 | { |
191 | cf_socket* sock = &shared->fd_h->sock; |
192 | uint8_t* buf = (uint8_t*)&buffer->proto; |
193 | size_t size = sizeof(as_proto) + buffer->size; |
194 | size_t off = shared->buffer_offset; |
195 | size_t remaining = size - off; |
196 | |
197 | ssize_t sent = cf_socket_try_send_all(sock, buf + off, remaining, flags); |
198 | |
199 | if (sent < 0) { |
200 | shared->bad_response_fd = true; |
201 | return BATCH_ERROR; |
202 | } |
203 | |
204 | if (sent < remaining) { |
205 | shared->buffer_offset += sent; |
206 | return BATCH_DELAY; |
207 | } |
208 | |
209 | return BATCH_SUCCESS; |
210 | } |
211 | |
212 | static inline void |
213 | as_batch_release_buffer(as_batch_shared* shared, as_batch_buffer* buffer) |
214 | { |
215 | if (as_buffer_pool_push_limit(&batch_buffer_pool, buffer, buffer->capacity, |
216 | g_config.batch_max_unused_buffers) != 0) { |
217 | // The push frees buffer on failure, so we just increment stat. |
218 | cf_atomic64_incr(&g_stats.batch_index_destroyed_buffers); |
219 | } |
220 | } |
221 | |
222 | static void |
223 | as_batch_complete(as_batch_queue* queue, as_batch_shared* shared, int status) |
224 | { |
225 | as_end_of_transaction(shared->fd_h, status != 0); |
226 | shared->fd_h = NULL; |
227 | |
228 | // For now the model is timeouts don't appear in histograms. |
229 | if (shared->result_code != AS_ERR_TIMEOUT) { |
230 | G_HIST_ACTIVATE_INSERT_DATA_POINT(batch_index_hist, shared->start); |
231 | } |
232 | |
233 | // Check return code in order to update statistics. |
234 | if (status == 0 && shared->result_code == 0) { |
235 | cf_atomic64_incr(&g_stats.batch_index_complete); |
236 | } |
237 | else { |
238 | if (shared->result_code == AS_ERR_TIMEOUT) { |
239 | cf_atomic64_incr(&g_stats.batch_index_timeout); |
240 | } |
241 | else { |
242 | cf_atomic64_incr(&g_stats.batch_index_errors); |
243 | } |
244 | } |
245 | |
246 | // Destroy lock |
247 | cf_mutex_destroy(&shared->lock); |
248 | |
249 | // Release memory |
250 | predexp_destroy(shared->predexp); |
251 | cf_free(shared->msgp); |
252 | cf_free(shared); |
253 | |
254 | // It's critical that this count is decremented after the transaction is |
255 | // completely finished with the queue because "shutdown threads" relies |
256 | // on this information when performing graceful shutdown. |
257 | cf_atomic32_decr(&queue->tran_count); |
258 | } |
259 | |
260 | static bool |
261 | as_batch_send_trailer(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer) |
262 | { |
263 | // Use existing buffer to store trailer message. |
264 | as_proto* proto = &buffer->proto; |
265 | proto->version = PROTO_VERSION; |
266 | proto->type = PROTO_TYPE_AS_MSG; |
267 | proto->sz = sizeof(as_msg); |
268 | as_proto_swap(proto); |
269 | |
270 | as_msg* msg = (as_msg*)buffer->data; |
271 | msg->header_sz = sizeof(as_msg); |
272 | msg->info1 = 0; |
273 | msg->info2 = 0; |
274 | msg->info3 = AS_MSG_INFO3_LAST; |
275 | msg->unused = 0; |
276 | msg->result_code = shared->result_code; |
277 | msg->generation = 0; |
278 | msg->record_ttl = 0; |
279 | msg->transaction_ttl = 0; |
280 | msg->n_fields = 0; |
281 | msg->n_ops = 0; |
282 | as_msg_swap_header(msg); |
283 | |
284 | buffer->size = sizeof(as_msg); |
285 | shared->buffer_offset = 0; |
286 | |
287 | int status = as_batch_send_buffer(shared, buffer, MSG_NOSIGNAL); |
288 | |
289 | if (status == BATCH_DELAY) { |
290 | return false; |
291 | } |
292 | |
293 | as_batch_release_buffer(shared, buffer); |
294 | as_batch_complete(queue, shared, status); |
295 | return true; |
296 | } |
297 | |
298 | static inline bool |
299 | as_batch_buffer_end(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer, int status) |
300 | { |
301 | // If we're invoked for the trailer, we're done. Free buffer and batch. |
302 | if (shared->in_trailer) { |
303 | as_batch_release_buffer(shared, buffer); |
304 | as_batch_complete(queue, shared, status); |
305 | return true; |
306 | } |
307 | |
308 | shared->tran_count_response += buffer->tran_count; |
309 | |
310 | // We haven't yet reached the last buffer. Free buffer. |
311 | if (shared->tran_count_response < shared->tran_max) { |
312 | as_batch_release_buffer(shared, buffer); |
313 | return true; |
314 | } |
315 | |
316 | // We've reached the last buffer. If we cannot send a trailer, then we're |
317 | // done. Free buffer and batch. |
318 | if (shared->bad_response_fd) { |
319 | as_batch_release_buffer(shared, buffer); |
320 | as_batch_complete(queue, shared, status); |
321 | return true; |
322 | } |
323 | |
324 | // Reuse the last buffer for the trailer. |
325 | shared->in_trailer = true; |
326 | return as_batch_send_trailer(queue, shared, buffer); |
327 | } |
328 | |
329 | static inline bool |
330 | as_batch_abandon(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer) |
331 | { |
332 | if (cf_getns() >= shared->end) { |
333 | cf_warning(AS_BATCH, "abandoned batch from %s with %u transactions after %lu ms" , |
334 | shared->fd_h->client, shared->tran_max, |
335 | (shared->end - shared->start) / 1000000); |
336 | shared->bad_response_fd = true; |
337 | as_batch_buffer_end(queue, shared, buffer, BATCH_ERROR); |
338 | return true; |
339 | } |
340 | |
341 | return false; |
342 | } |
343 | |
344 | static bool |
345 | as_batch_send_delayed(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer) |
346 | { |
347 | // If we get here, other buffers in this batch can't have affected or |
348 | // reacted to this batch's status. All that happened is this buffer was |
349 | // delayed. Therefore, we don't need to check for error conditions. |
350 | |
351 | int status = as_batch_send_buffer(shared, buffer, MSG_NOSIGNAL | MSG_MORE); |
352 | |
353 | if (status == BATCH_DELAY) { |
354 | return false; |
355 | } |
356 | |
357 | return as_batch_buffer_end(queue, shared, buffer, status); |
358 | } |
359 | |
360 | static bool |
361 | as_batch_send_response(as_batch_queue* queue, as_batch_shared* shared, as_batch_buffer* buffer) |
362 | { |
363 | cf_assert(buffer->capacity != 0, AS_BATCH, "buffer capacity 0" ); |
364 | |
365 | // Don't send buffer if an error has already occurred. |
366 | if (shared->bad_response_fd || shared->result_code) { |
367 | return as_batch_buffer_end(queue, shared, buffer, BATCH_ERROR); |
368 | } |
369 | |
370 | shared->buffer_offset = 0; |
371 | |
372 | // Send buffer block to client socket. |
373 | buffer->proto.version = PROTO_VERSION; |
374 | buffer->proto.type = PROTO_TYPE_AS_MSG; |
375 | buffer->proto.sz = buffer->size; |
376 | as_proto_swap(&buffer->proto); |
377 | |
378 | int status = as_batch_send_buffer(shared, buffer, MSG_NOSIGNAL | MSG_MORE); |
379 | |
380 | if (status == BATCH_DELAY) { |
381 | return false; |
382 | } |
383 | |
384 | return as_batch_buffer_end(queue, shared, buffer, status); |
385 | } |
386 | |
387 | static inline void |
388 | as_batch_delay_buffer(as_batch_queue* queue) |
389 | { |
390 | cf_atomic64_incr(&g_stats.batch_index_delay); |
391 | |
392 | // If all batch transactions on this thread are delayed, avoid tight loop. |
393 | if (queue->tran_count == queue->delay_count) { |
394 | pthread_yield(); // not cf_thread_yield() - we're using as_thread_pool |
395 | } |
396 | } |
397 | |
398 | static void |
399 | as_batch_worker(void* udata) |
400 | { |
401 | // Send batch data to client, one buffer block at a time. |
402 | as_batch_work* work = (as_batch_work*)udata; |
403 | as_batch_queue* batch_queue = work->batch_queue; |
404 | cf_queue* response_queue = batch_queue->response_queue; |
405 | as_batch_response response; |
406 | as_batch_shared* shared; |
407 | as_batch_buffer* buffer; |
408 | |
409 | while (cf_queue_pop(response_queue, &response, CF_QUEUE_FOREVER) == CF_QUEUE_OK) { |
410 | // Check if this thread task should end. |
411 | shared = response.shared; |
412 | if (! shared) { |
413 | break; |
414 | } |
415 | |
416 | buffer = response.buffer; |
417 | |
418 | if (! shared->delayed_buffer) { |
419 | if (as_batch_send_response(batch_queue, shared, buffer)) { |
420 | continue; |
421 | } |
422 | |
423 | if (as_batch_abandon(batch_queue, shared, buffer)) { |
424 | continue; |
425 | } |
426 | |
427 | // Socket blocked. |
428 | shared->delayed_buffer = buffer; |
429 | batch_queue->delay_count++; |
430 | as_batch_delay_buffer(batch_queue); |
431 | } |
432 | else { |
433 | // Batch is delayed - try only original delayed buffer. |
434 | if (shared->delayed_buffer == buffer) { |
435 | shared->delayed_buffer = NULL; |
436 | |
437 | if (as_batch_send_delayed(batch_queue, shared, buffer)) { |
438 | batch_queue->delay_count--; |
439 | continue; |
440 | } |
441 | |
442 | if (as_batch_abandon(batch_queue, shared, buffer)) { |
443 | batch_queue->delay_count--; |
444 | continue; |
445 | } |
446 | |
447 | // Socket blocked again. |
448 | shared->delayed_buffer = buffer; |
449 | as_batch_delay_buffer(batch_queue); |
450 | } |
451 | // else - delayed by another buffer in this batch, just re-queue. |
452 | } |
453 | |
454 | cf_queue_push(response_queue, &response); |
455 | } |
456 | |
457 | // Send back completion notification. |
458 | uint32_t complete = 1; |
459 | cf_queue_push(work->batch_queue->complete_queue, &complete); |
460 | } |
461 | |
462 | static int |
463 | as_batch_create_thread_queues(uint32_t begin, uint32_t end) |
464 | { |
465 | // Allocate one queue per batch response worker thread. |
466 | int status = 0; |
467 | |
468 | as_batch_work work; |
469 | work.complete = false; |
470 | |
471 | for (uint32_t i = begin; i < end; i++) { |
472 | work.batch_queue = &batch_queues[i]; |
473 | work.batch_queue->response_queue = cf_queue_create(sizeof(as_batch_response), true); |
474 | work.batch_queue->complete_queue = cf_queue_create(sizeof(uint32_t), true); |
475 | work.batch_queue->tran_count = 0; |
476 | work.batch_queue->delay_count = 0; |
477 | work.batch_queue->active = true; |
478 | |
479 | int rc = as_thread_pool_queue_task_fixed(&batch_thread_pool, &work); |
480 | |
481 | if (rc) { |
482 | cf_warning(AS_BATCH, "Failed to create batch thread %u: %d" , i, rc); |
483 | status = rc; |
484 | } |
485 | } |
486 | return status; |
487 | } |
488 | |
489 | static bool |
490 | as_batch_wait(uint32_t begin, uint32_t end) |
491 | { |
492 | for (uint32_t i = begin; i < end; i++) { |
493 | if (batch_queues[i].tran_count > 0) { |
494 | return false; |
495 | } |
496 | } |
497 | return true; |
498 | } |
499 | |
500 | static int |
501 | as_batch_shutdown_thread_queues(uint32_t begin, uint32_t end) |
502 | { |
503 | // Set excess queues to inactive. |
504 | // Existing batch transactions will be allowed to complete. |
505 | for (uint32_t i = begin; i < end; i++) { |
506 | batch_queues[i].active = false; |
507 | } |
508 | |
509 | // Wait till there are no more active batch transactions on the queues. |
510 | // Timeout after 30 seconds. |
511 | uint64_t limitus = cf_getus() + (1000 * 1000 * 30); |
512 | usleep(50 * 1000); // Sleep 50ms |
513 | do { |
514 | if (as_batch_wait(begin, end)) { |
515 | break; |
516 | } |
517 | usleep(500 * 1000); // Sleep 500ms |
518 | |
519 | if (cf_getus() > limitus) { |
520 | cf_warning(AS_BATCH, "Batch shutdown threads failed on timeout. Transactions remain on queue." ); |
521 | // Reactivate queues. |
522 | for (uint32_t i = begin; i < end; i++) { |
523 | batch_queues[i].active = true; |
524 | } |
525 | return -1; |
526 | } |
527 | } while (true); |
528 | |
529 | // Send stop command to excess queues. |
530 | as_batch_response response; |
531 | memset(&response, 0, sizeof(as_batch_response)); |
532 | |
533 | for (uint32_t i = begin; i < end; i++) { |
534 | cf_queue_push(batch_queues[i].response_queue, &response); |
535 | } |
536 | |
537 | // Wait for completion events. |
538 | uint32_t complete; |
539 | for (uint32_t i = begin; i < end; i++) { |
540 | as_batch_queue* bq = &batch_queues[i]; |
541 | cf_queue_pop(bq->complete_queue, &complete, CF_QUEUE_FOREVER); |
542 | cf_queue_destroy(bq->complete_queue); |
543 | bq->complete_queue = 0; |
544 | cf_queue_destroy(bq->response_queue); |
545 | bq->response_queue = 0; |
546 | } |
547 | return 0; |
548 | } |
549 | |
550 | static as_batch_queue* |
551 | as_batch_find_queue(int queue_index) |
552 | { |
553 | // Search backwards for an active queue. |
554 | for (int index = queue_index - 1; index >= 0; index--) { |
555 | as_batch_queue* bq = &batch_queues[index]; |
556 | |
557 | if (bq->active && cf_queue_sz(bq->response_queue) < g_config.batch_max_buffers_per_queue) { |
558 | return bq; |
559 | } |
560 | } |
561 | |
562 | // Search forwards. |
563 | for (int index = queue_index + 1; index < MAX_BATCH_THREADS; index++) { |
564 | as_batch_queue* bq = &batch_queues[index]; |
565 | |
566 | // If current queue is not active, future queues will not be active either. |
567 | if (! bq->active) { |
568 | break; |
569 | } |
570 | |
571 | if (cf_queue_sz(bq->response_queue) < g_config.batch_max_buffers_per_queue) { |
572 | return bq; |
573 | } |
574 | } |
575 | return 0; |
576 | } |
577 | |
578 | static as_batch_buffer* |
579 | as_batch_buffer_create(uint32_t size) |
580 | { |
581 | as_batch_buffer* buffer = cf_malloc(size); |
582 | buffer->capacity = size - batch_buffer_pool.header_size; |
583 | cf_atomic64_incr(&g_stats.batch_index_created_buffers); |
584 | return buffer; |
585 | } |
586 | |
587 | static uint8_t* |
588 | as_batch_buffer_pop(as_batch_shared* shared, uint32_t size) |
589 | { |
590 | as_batch_buffer* buffer; |
591 | uint32_t mem_size = size + batch_buffer_pool.header_size; |
592 | |
593 | if (mem_size > batch_buffer_pool.buffer_size) { |
594 | // Requested size is greater than fixed buffer size. |
595 | // Allocate new buffer, but don't put back into pool. |
596 | buffer = as_batch_buffer_create(mem_size); |
597 | cf_atomic64_incr(&g_stats.batch_index_huge_buffers); |
598 | } |
599 | else { |
600 | // Pop existing buffer from queue. |
601 | // The extra lock here is unavoidable. |
602 | int status = cf_queue_pop(batch_buffer_pool.queue, &buffer, CF_QUEUE_NOWAIT); |
603 | |
604 | if (status == CF_QUEUE_OK) { |
605 | buffer->capacity = batch_buffer_pool.buffer_size - batch_buffer_pool.header_size; |
606 | } |
607 | else if (status == CF_QUEUE_EMPTY) { |
608 | // Queue is empty. Create new buffer. |
609 | buffer = as_batch_buffer_create(batch_buffer_pool.buffer_size); |
610 | } |
611 | else { |
612 | cf_crash(AS_BATCH, "Failed to pop new batch buffer: %d" , status); |
613 | } |
614 | } |
615 | |
616 | // Reserve a slot in new buffer. |
617 | buffer->size = size; |
618 | buffer->tran_count = 1; |
619 | buffer->writers = 2; |
620 | shared->buffer = buffer; |
621 | return buffer->data; |
622 | } |
623 | |
624 | static inline void |
625 | as_batch_buffer_complete(as_batch_shared* shared, as_batch_buffer* buffer) |
626 | { |
627 | // Flush when all writers have finished writing into the buffer. |
628 | if (cf_atomic32_decr(&buffer->writers) == 0) { |
629 | as_batch_response response = {.shared = shared, .buffer = buffer}; |
630 | cf_queue_push(shared->response_queue, &response); |
631 | } |
632 | } |
633 | |
634 | static uint8_t* |
635 | as_batch_reserve(as_batch_shared* shared, uint32_t size, int result_code, as_batch_buffer** buffer_out, bool* complete) |
636 | { |
637 | as_batch_buffer* buffer; |
638 | uint8_t* data; |
639 | |
640 | cf_mutex_lock(&shared->lock); |
641 | *complete = (++shared->tran_count == shared->tran_max); |
642 | buffer = shared->buffer; |
643 | |
644 | if (! buffer) { |
645 | // No previous buffer. Get new buffer. |
646 | data = as_batch_buffer_pop(shared, size); |
647 | *buffer_out = shared->buffer; |
648 | cf_mutex_unlock(&shared->lock); |
649 | } |
650 | else if (buffer->size + size <= buffer->capacity) { |
651 | // Result fits into existing block. Reserve a slot. |
652 | data = buffer->data + buffer->size; |
653 | buffer->size += size; |
654 | buffer->tran_count++; |
655 | cf_atomic32_incr(&buffer->writers); |
656 | *buffer_out = buffer; |
657 | cf_mutex_unlock(&shared->lock); |
658 | } |
659 | else { |
660 | // Result does not fit into existing block. |
661 | // Make copy of existing buffer. |
662 | as_batch_buffer* prev_buffer = buffer; |
663 | |
664 | // Get new buffer. |
665 | data = as_batch_buffer_pop(shared, size); |
666 | *buffer_out = shared->buffer; |
667 | cf_mutex_unlock(&shared->lock); |
668 | |
669 | as_batch_buffer_complete(shared, prev_buffer); |
670 | } |
671 | |
672 | if (! (result_code == AS_OK || result_code == AS_ERR_NOT_FOUND || |
673 | result_code == AS_ERR_FILTERED_OUT)) { |
674 | // Result code can be set outside of lock because it doesn't matter which transaction's |
675 | // result code is used as long as it's an error. |
676 | shared->result_code = result_code; |
677 | } |
678 | return data; |
679 | } |
680 | |
681 | static inline void |
682 | as_batch_transaction_end(as_batch_shared* shared, as_batch_buffer* buffer, bool complete) |
683 | { |
684 | // This flush can only be triggered when the buffer is full. |
685 | as_batch_buffer_complete(shared, buffer); |
686 | |
687 | if (complete) { |
688 | // This flush only occurs when all transactions in batch have been processed. |
689 | as_batch_buffer_complete(shared, buffer); |
690 | } |
691 | } |
692 | |
693 | static void |
694 | as_batch_terminate(as_batch_shared* shared, uint32_t tran_count, int result_code) |
695 | { |
696 | // Terminate batch by adding phantom transactions to shared and buffer tran counts. |
697 | // This is done so the memory is released at the end only once. |
698 | as_batch_buffer* buffer; |
699 | bool complete; |
700 | |
701 | cf_mutex_lock(&shared->lock); |
702 | buffer = shared->buffer; |
703 | shared->result_code = result_code; |
704 | shared->tran_count += tran_count; |
705 | complete = (shared->tran_count == shared->tran_max); |
706 | |
707 | if (! buffer) { |
708 | // No previous buffer. Get new buffer. |
709 | as_batch_buffer_pop(shared, 0); |
710 | buffer = shared->buffer; |
711 | buffer->tran_count = tran_count; // Override tran_count. |
712 | } |
713 | else { |
714 | // Buffer exists. Add phantom transactions. |
715 | buffer->tran_count += tran_count; |
716 | cf_atomic32_incr(&buffer->writers); |
717 | } |
718 | cf_mutex_unlock(&shared->lock); |
719 | as_batch_transaction_end(shared, buffer, complete); |
720 | } |
721 | |
722 | //--------------------------------------------------------- |
723 | // FUNCTIONS |
724 | //--------------------------------------------------------- |
725 | |
726 | int |
727 | as_batch_init() |
728 | { |
729 | cf_mutex_init(&batch_resize_lock); |
730 | |
731 | // Default 'batch-index-threads' can't be set before call to cf_topo_init(). |
732 | if (g_config.n_batch_index_threads == 0) { |
733 | g_config.n_batch_index_threads = cf_topo_count_cpus(); |
734 | } |
735 | |
736 | cf_info(AS_BATCH, "starting %u batch-index-threads" , g_config.n_batch_index_threads); |
737 | |
738 | int rc = as_thread_pool_init_fixed(&batch_thread_pool, g_config.n_batch_index_threads, as_batch_worker, |
739 | sizeof(as_batch_work), offsetof(as_batch_work,complete)); |
740 | |
741 | if (rc) { |
742 | cf_warning(AS_BATCH, "Failed to initialize batch-index-threads to %u: %d" , g_config.n_batch_index_threads, rc); |
743 | return rc; |
744 | } |
745 | |
746 | rc = as_buffer_pool_init(&batch_buffer_pool, sizeof(as_batch_buffer), BATCH_BLOCK_SIZE); |
747 | |
748 | if (rc) { |
749 | cf_warning(AS_BATCH, "Failed to initialize batch buffer pool: %d" , rc); |
750 | return rc; |
751 | } |
752 | |
753 | rc = as_batch_create_thread_queues(0, g_config.n_batch_index_threads); |
754 | |
755 | if (rc) { |
756 | return rc; |
757 | } |
758 | |
759 | return 0; |
760 | } |
761 | |
762 | int |
763 | as_batch_queue_task(as_transaction* btr) |
764 | { |
765 | uint64_t counter = cf_atomic64_incr(&g_stats.batch_index_initiate); |
766 | uint32_t thread_size = batch_thread_pool.thread_size; |
767 | |
768 | if (thread_size == 0 || thread_size > MAX_BATCH_THREADS) { |
769 | cf_warning(AS_BATCH, "batch-index-threads has been disabled: %d" , thread_size); |
770 | return as_batch_send_error(btr, AS_ERR_BATCH_DISABLED); |
771 | } |
772 | uint32_t queue_index = counter % thread_size; |
773 | |
774 | // Validate batch transaction |
775 | as_proto* bproto = &btr->msgp->proto; |
776 | |
777 | if (bproto->sz > PROTO_SIZE_MAX) { |
778 | cf_warning(AS_BATCH, "can't process message: invalid size %lu should be %d or less" , |
779 | (uint64_t)bproto->sz, PROTO_SIZE_MAX); |
780 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
781 | } |
782 | |
783 | if (bproto->type != PROTO_TYPE_AS_MSG) { |
784 | cf_warning(AS_BATCH, "Invalid proto type. Expected %d Received %d" , PROTO_TYPE_AS_MSG, bproto->type); |
785 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
786 | } |
787 | |
788 | // Check that the socket is authenticated. |
789 | uint8_t result = as_security_check(btr->from.proto_fd_h, PERM_NONE); |
790 | |
791 | if (result != AS_OK) { |
792 | as_security_log(btr->from.proto_fd_h, result, PERM_NONE, NULL, NULL); |
793 | return as_batch_send_error(btr, result); |
794 | } |
795 | |
796 | // Parse header |
797 | as_msg* bmsg = &btr->msgp->msg; |
798 | as_msg_swap_header(bmsg); |
799 | |
800 | // Parse fields |
801 | uint8_t* limit = (uint8_t*)bmsg + bproto->sz; |
802 | as_msg_field* mf = (as_msg_field*)bmsg->data; |
803 | as_msg_field* end; |
804 | as_msg_field* bf = 0; |
805 | as_msg_field* predexp_mf = 0; |
806 | |
807 | for (int i = 0; i < bmsg->n_fields; i++) { |
808 | if ((uint8_t*)mf >= limit) { |
809 | cf_warning(AS_BATCH, "Batch field limit reached" ); |
810 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
811 | } |
812 | as_msg_swap_field(mf); |
813 | end = as_msg_field_get_next(mf); |
814 | |
815 | if (mf->type == AS_MSG_FIELD_TYPE_BATCH || mf->type == AS_MSG_FIELD_TYPE_BATCH_WITH_SET) { |
816 | bf = mf; |
817 | } |
818 | else if (mf->type == AS_MSG_FIELD_TYPE_PREDEXP) { |
819 | predexp_mf = mf; |
820 | } |
821 | |
822 | mf = end; |
823 | } |
824 | |
825 | if (! bf) { |
826 | cf_warning(AS_BATCH, "Batch index field not found" ); |
827 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
828 | } |
829 | |
830 | // Parse batch field |
831 | uint8_t* data = bf->data; |
832 | uint32_t tran_count = cf_swap_from_be32(*(uint32_t*)data); |
833 | data += sizeof(uint32_t); |
834 | |
835 | if (tran_count == 0) { |
836 | cf_warning(AS_BATCH, "Batch request size is zero" ); |
837 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
838 | } |
839 | |
840 | if (tran_count > g_config.batch_max_requests) { |
841 | cf_warning(AS_BATCH, "Batch request size %u exceeds max %u" , tran_count, g_config.batch_max_requests); |
842 | return as_batch_send_error(btr, AS_ERR_BATCH_MAX_REQUESTS); |
843 | } |
844 | |
845 | // Initialize shared data |
846 | as_batch_shared* shared = cf_malloc(sizeof(as_batch_shared)); |
847 | |
848 | memset(shared, 0, sizeof(as_batch_shared)); |
849 | |
850 | cf_mutex_init(&shared->lock); |
851 | |
852 | // Abandon batch at twice batch timeout if batch timeout is defined. |
853 | // If batch timeout is zero, abandon after 30 seconds. |
854 | shared->end = btr->start_time + ((bmsg->transaction_ttl != 0) ? |
855 | ((uint64_t)bmsg->transaction_ttl * 1000 * 1000 * 2) : |
856 | BATCH_ABANDON_LIMIT); |
857 | |
858 | shared->start = btr->start_time; |
859 | shared->fd_h = btr->from.proto_fd_h; |
860 | shared->msgp = btr->msgp; |
861 | shared->tran_max = tran_count; |
862 | |
863 | // Find batch queue to send transaction responses. |
864 | as_batch_queue* batch_queue = &batch_queues[queue_index]; |
865 | |
866 | // batch_max_buffers_per_queue is a soft limit, but still must be checked under lock. |
867 | if (! (batch_queue->active && cf_queue_sz(batch_queue->response_queue) < g_config.batch_max_buffers_per_queue)) { |
868 | // Queue buffer limit has been exceeded or thread has been shutdown (probably due to |
869 | // downwards thread resize). Search for an available queue. |
870 | // cf_warning(AS_BATCH, "Queue %u full %d", queue_index, cf_queue_sz(batch_queue->response_queue)); |
871 | batch_queue = as_batch_find_queue(queue_index); |
872 | |
873 | if (! batch_queue) { |
874 | cf_warning(AS_BATCH, "Failed to find active batch queue that is not full" ); |
875 | cf_free(shared); |
876 | return as_batch_send_error(btr, AS_ERR_BATCH_QUEUES_FULL); |
877 | } |
878 | } |
879 | |
880 | if (predexp_mf != NULL) { |
881 | shared->predexp_mf = predexp_mf; |
882 | |
883 | if ((shared->predexp = predexp_build(predexp_mf)) == NULL) { |
884 | cf_warning(AS_BATCH, "Failed to build batch predexp" ); |
885 | cf_free(shared); |
886 | return as_batch_send_error(btr, AS_ERR_PARAMETER); |
887 | } |
888 | } |
889 | |
890 | // Increment batch queue transaction count. |
891 | cf_atomic32_incr(&batch_queue->tran_count); |
892 | shared->response_queue = batch_queue->response_queue; |
893 | |
894 | // Initialize generic transaction. |
895 | as_transaction tr; |
896 | as_transaction_init_head(&tr, 0, 0); |
897 | |
898 | tr.origin = FROM_BATCH; |
899 | tr.from_flags |= FROM_FLAG_BATCH_SUB; |
900 | tr.start_time = btr->start_time; |
901 | |
902 | // Read batch keys and initialize generic transactions. |
903 | as_batch_input* in; |
904 | cl_msg* out = NULL; |
905 | cl_msg* prev_msgp = NULL; |
906 | as_msg_op* op; |
907 | uint32_t tran_row = 0; |
908 | uint8_t info = *data++; // allow transaction inline. |
909 | |
910 | bool allow_inline = (g_config.n_namespaces_inlined != 0 && info); |
911 | bool check_inline = (allow_inline && g_config.n_namespaces_not_inlined != 0); |
912 | bool should_inline = (allow_inline && g_config.n_namespaces_not_inlined == 0); |
913 | |
914 | // Split batch rows into separate single record read transactions. |
915 | // The read transactions are located in the same memory block as |
916 | // the original batch transactions. This allows us to avoid performing |
917 | // an extra malloc for each transaction. |
918 | while (tran_row < tran_count && data + BATCH_REPEAT_SIZE <= limit) { |
919 | // Copy transaction data before memory gets overwritten. |
920 | in = (as_batch_input*)data; |
921 | |
922 | tr.from.batch_shared = shared; // is set NULL after sub-transaction |
923 | tr.from_data.batch_index = cf_swap_from_be32(in->index); |
924 | tr.keyd = in->keyd; |
925 | tr.benchmark_time = btr->benchmark_time; // must reset for each usage |
926 | |
927 | if (in->repeat) { |
928 | if (! prev_msgp) { |
929 | break; // bad bytes from client - repeat set on first item |
930 | } |
931 | |
932 | // Row should use previous namespace and bin names. |
933 | data += BATCH_REPEAT_SIZE; |
934 | tr.msgp = prev_msgp; |
935 | } |
936 | else { |
937 | tr.msg_fields = 0; // erase previous AS_MSG_FIELD_BIT_SET flag, if any |
938 | as_transaction_set_msg_field_flag(&tr, AS_MSG_FIELD_TYPE_NAMESPACE); |
939 | |
940 | // Row contains full namespace/bin names. |
941 | out = (cl_msg*)data; |
942 | |
943 | if (data + sizeof(cl_msg) + sizeof(as_msg_field) > limit) { |
944 | break; |
945 | } |
946 | |
947 | out->msg.header_sz = sizeof(as_msg); |
948 | out->msg.info1 = in->info1; |
949 | out->msg.info2 = 0; |
950 | out->msg.info3 = bmsg->info3 & |
951 | (AS_MSG_INFO3_SC_READ_RELAX | AS_MSG_INFO3_SC_READ_TYPE); |
952 | out->msg.unused = 0; |
953 | out->msg.result_code = 0; |
954 | out->msg.generation = 0; |
955 | out->msg.record_ttl = 0; |
956 | out->msg.transaction_ttl = bmsg->transaction_ttl; // already swapped |
957 | // n_fields/n_ops is in exact same place on both input/output, but the value still |
958 | // needs to be swapped. |
959 | out->msg.n_fields = cf_swap_from_be16(in->n_fields); |
960 | |
961 | // Older clients sent zero, but always sent namespace. Adjust this. |
962 | if (out->msg.n_fields == 0) { |
963 | out->msg.n_fields = 1; |
964 | } |
965 | |
966 | out->msg.n_ops = cf_swap_from_be16(in->n_ops); |
967 | |
968 | // Namespace input is same as namespace field, so just leave in place and swap. |
969 | data += sizeof(cl_msg); |
970 | mf = (as_msg_field*)data; |
971 | as_msg_swap_field(mf); |
972 | if (check_inline) { |
973 | as_namespace* ns = as_namespace_get_bymsgfield(mf); |
974 | should_inline = ns && ns->storage_data_in_memory; |
975 | } |
976 | mf = as_msg_field_get_next(mf); |
977 | data = (uint8_t*)mf; |
978 | |
979 | // Swap remaining fields. |
980 | for (uint16_t j = 1; j < out->msg.n_fields; j++) { |
981 | if (data + sizeof(as_msg_field) > limit) { |
982 | goto TranEnd; |
983 | } |
984 | |
985 | if (mf->type == AS_MSG_FIELD_TYPE_SET) { |
986 | as_transaction_set_msg_field_flag(&tr, AS_MSG_FIELD_TYPE_SET); |
987 | } |
988 | |
989 | as_msg_swap_field(mf); |
990 | mf = as_msg_field_get_next(mf); |
991 | data = (uint8_t*)mf; |
992 | } |
993 | |
994 | if (out->msg.n_ops) { |
995 | // Bin names input is same as transaction ops, so just leave in place and swap. |
996 | uint16_t n_ops = out->msg.n_ops; |
997 | for (uint16_t j = 0; j < n_ops; j++) { |
998 | if (data + sizeof(as_msg_op) > limit) { |
999 | goto TranEnd; |
1000 | } |
1001 | op = (as_msg_op*)data; |
1002 | as_msg_swap_op(op); |
1003 | op = as_msg_op_get_next(op); |
1004 | data = (uint8_t*)op; |
1005 | } |
1006 | } |
1007 | |
1008 | // Initialize msg header. |
1009 | out->proto.version = PROTO_VERSION; |
1010 | out->proto.type = PROTO_TYPE_AS_MSG; |
1011 | out->proto.sz = (data - (uint8_t*)&out->msg); |
1012 | tr.msgp = out; |
1013 | prev_msgp = out; |
1014 | } |
1015 | |
1016 | if (data > limit) { |
1017 | break; |
1018 | } |
1019 | |
1020 | // Submit transaction. |
1021 | if (should_inline) { |
1022 | as_tsvc_process_transaction(&tr); |
1023 | } |
1024 | else { |
1025 | // Queue transaction to be processed by a transaction thread. |
1026 | as_service_enqueue_internal(&tr); |
1027 | } |
1028 | tran_row++; |
1029 | } |
1030 | |
1031 | TranEnd: |
1032 | if (tran_row < tran_count) { |
1033 | // Mismatch between tran_count and actual data. Terminate transaction. |
1034 | cf_warning(AS_BATCH, "Batch keys mismatch. Expected %u Received %u" , tran_count, tran_row); |
1035 | as_batch_terminate(shared, tran_count - tran_row, AS_ERR_PARAMETER); |
1036 | } |
1037 | |
1038 | // Reset original socket because socket now owned by batch shared. |
1039 | btr->from.proto_fd_h = NULL; |
1040 | return 0; |
1041 | } |
1042 | |
1043 | void |
1044 | as_batch_add_result(as_transaction* tr, uint16_t n_bins, as_bin** bins, |
1045 | as_msg_op** ops) |
1046 | { |
1047 | as_namespace* ns = tr->rsv.ns; |
1048 | |
1049 | // Calculate size. |
1050 | size_t size = sizeof(as_msg); |
1051 | size += sizeof(as_msg_field) + sizeof(cf_digest); |
1052 | |
1053 | uint16_t n_fields = 1; |
1054 | |
1055 | for (uint16_t i = 0; i < n_bins; i++) { |
1056 | as_bin* bin = bins[i]; |
1057 | size += sizeof(as_msg_op); |
1058 | |
1059 | if (ops) { |
1060 | size += ops[i]->name_sz; |
1061 | } |
1062 | else if (bin) { |
1063 | size += ns->single_bin ? 0 : strlen(as_bin_get_name_from_id(ns, bin->id)); |
1064 | } |
1065 | else { |
1066 | cf_crash(AS_BATCH, "making response message with null bin and op" ); |
1067 | } |
1068 | |
1069 | if (bin) { |
1070 | size += as_bin_particle_client_value_size(bin); |
1071 | } |
1072 | } |
1073 | |
1074 | as_batch_shared* shared = tr->from.batch_shared; |
1075 | |
1076 | as_batch_buffer* buffer; |
1077 | bool complete; |
1078 | uint8_t* data = as_batch_reserve(shared, size, tr->result_code, &buffer, &complete); |
1079 | |
1080 | if (data) { |
1081 | // Write header. |
1082 | uint8_t* p = data; |
1083 | as_msg* m = (as_msg*)p; |
1084 | m->header_sz = sizeof(as_msg); |
1085 | m->info1 = 0; |
1086 | m->info2 = 0; |
1087 | m->info3 = 0; |
1088 | m->unused = 0; |
1089 | m->result_code = tr->result_code; |
1090 | m->generation = plain_generation(tr->generation, ns); |
1091 | m->record_ttl = tr->void_time; |
1092 | |
1093 | // Overload transaction_ttl to store batch index. |
1094 | m->transaction_ttl = tr->from_data.batch_index; |
1095 | |
1096 | m->n_fields = n_fields; |
1097 | m->n_ops = n_bins; |
1098 | as_msg_swap_header(m); |
1099 | p += sizeof(as_msg); |
1100 | |
1101 | as_msg_field* field = (as_msg_field*)p; |
1102 | field->field_sz = sizeof(cf_digest) + 1; |
1103 | field->type = AS_MSG_FIELD_TYPE_DIGEST_RIPE; |
1104 | memcpy(field->data, &tr->keyd, sizeof(cf_digest)); |
1105 | as_msg_swap_field(field); |
1106 | p += sizeof(as_msg_field) + sizeof(cf_digest); |
1107 | |
1108 | for (uint16_t i = 0; i < n_bins; i++) { |
1109 | as_bin* bin = bins[i]; |
1110 | as_msg_op* op = (as_msg_op*)p; |
1111 | op->op = AS_MSG_OP_READ; |
1112 | op->version = 0; |
1113 | |
1114 | if (ops) { |
1115 | as_msg_op* src = ops[i]; |
1116 | memcpy(op->name, src->name, src->name_sz); |
1117 | op->name_sz = src->name_sz; |
1118 | } |
1119 | else { |
1120 | op->name_sz = as_bin_memcpy_name(ns, op->name, bin); |
1121 | } |
1122 | |
1123 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
1124 | p += sizeof(as_msg_op) + op->name_sz; |
1125 | p += as_bin_particle_to_client(bin, op); |
1126 | as_msg_swap_op(op); |
1127 | } |
1128 | } |
1129 | as_batch_transaction_end(shared, buffer, complete); |
1130 | } |
1131 | |
1132 | void |
1133 | as_batch_add_proxy_result(as_batch_shared* shared, uint32_t index, cf_digest* digest, cl_msg* cmsg, size_t proxy_size) |
1134 | { |
1135 | as_msg* msg = &cmsg->msg; |
1136 | size_t size = proxy_size + sizeof(as_msg_field) + sizeof(cf_digest) - sizeof(as_proto); |
1137 | |
1138 | as_batch_buffer* buffer; |
1139 | bool complete; |
1140 | uint8_t* data = as_batch_reserve(shared, size, msg->result_code, &buffer, &complete); |
1141 | |
1142 | if (data) { |
1143 | // Overload transaction_ttl to store batch index. |
1144 | msg->transaction_ttl = htonl(index); |
1145 | |
1146 | // Write header |
1147 | uint16_t n_fields = ntohs(msg->n_fields); |
1148 | msg->n_fields = htons(n_fields + 1); |
1149 | memcpy(data, msg, sizeof(as_msg)); |
1150 | uint8_t* trg = data + sizeof(as_msg); |
1151 | |
1152 | // Write digest field |
1153 | as_msg_field* field = (as_msg_field*)trg; |
1154 | field->field_sz = sizeof(cf_digest) + 1; |
1155 | field->type = AS_MSG_FIELD_TYPE_DIGEST_RIPE; |
1156 | memcpy(field->data, digest, sizeof(cf_digest)); |
1157 | as_msg_swap_field(field); |
1158 | trg += sizeof(as_msg_field) + sizeof(cf_digest); |
1159 | |
1160 | // Copy others fields and ops. |
1161 | size = ((uint8_t*)cmsg + proxy_size) - msg->data; |
1162 | memcpy(trg, msg->data, size); |
1163 | } |
1164 | as_batch_transaction_end(shared, buffer, complete); |
1165 | } |
1166 | |
1167 | void |
1168 | as_batch_add_error(as_batch_shared* shared, uint32_t index, int result_code) |
1169 | { |
1170 | as_batch_buffer* buffer; |
1171 | bool complete; |
1172 | uint8_t* data = as_batch_reserve(shared, sizeof(as_msg), result_code, &buffer, &complete); |
1173 | |
1174 | if (data) { |
1175 | // Write error. |
1176 | as_msg* m = (as_msg*)data; |
1177 | m->header_sz = sizeof(as_msg); |
1178 | m->info1 = 0; |
1179 | m->info2 = 0; |
1180 | m->info3 = 0; |
1181 | m->unused = 0; |
1182 | m->result_code = result_code; |
1183 | m->generation = 0; |
1184 | m->record_ttl = 0; |
1185 | // Overload transaction_ttl to store batch index. |
1186 | m->transaction_ttl = index; |
1187 | m->n_fields = 0; |
1188 | m->n_ops = 0; |
1189 | as_msg_swap_header(m); |
1190 | } |
1191 | as_batch_transaction_end(shared, buffer, complete); |
1192 | } |
1193 | |
1194 | int |
1195 | as_batch_threads_resize(uint32_t threads) |
1196 | { |
1197 | if (threads > MAX_BATCH_THREADS) { |
1198 | cf_warning(AS_BATCH, "batch-index-threads %u exceeds max %u" , threads, MAX_BATCH_THREADS); |
1199 | return -1; |
1200 | } |
1201 | |
1202 | cf_mutex_lock(&batch_resize_lock); |
1203 | |
1204 | // Resize thread pool. The threads will wait for graceful shutdown on downwards resize. |
1205 | uint32_t threads_orig = batch_thread_pool.thread_size; |
1206 | cf_info(AS_BATCH, "Resize batch-index-threads from %u to %u" , threads_orig, threads); |
1207 | int status = 0; |
1208 | |
1209 | if (threads != threads_orig) { |
1210 | if (threads > threads_orig) { |
1211 | // Increase threads before initializing queues. |
1212 | status = as_thread_pool_resize(&batch_thread_pool, threads); |
1213 | |
1214 | if (status == 0) { |
1215 | g_config.n_batch_index_threads = threads; |
1216 | // Adjust queues to match new thread size. |
1217 | status = as_batch_create_thread_queues(threads_orig, threads); |
1218 | } |
1219 | else { |
1220 | // Show warning, but keep going as some threads may have been successfully added/removed. |
1221 | cf_warning(AS_BATCH, "Failed to resize batch-index-threads. status=%d, batch-index-threads=%u" , |
1222 | status, g_config.n_batch_index_threads); |
1223 | threads = batch_thread_pool.thread_size; |
1224 | |
1225 | if (threads > threads_orig) { |
1226 | g_config.n_batch_index_threads = threads; |
1227 | // Adjust queues to match new thread size. |
1228 | status = as_batch_create_thread_queues(threads_orig, threads); |
1229 | } |
1230 | } |
1231 | } |
1232 | else { |
1233 | // Shutdown queues before shutting down threads. |
1234 | status = as_batch_shutdown_thread_queues(threads, threads_orig); |
1235 | |
1236 | if (status == 0) { |
1237 | // Adjust threads to match new queue size. |
1238 | status = as_thread_pool_resize(&batch_thread_pool, threads); |
1239 | g_config.n_batch_index_threads = batch_thread_pool.thread_size; |
1240 | |
1241 | if (status) { |
1242 | cf_warning(AS_BATCH, "Failed to resize batch-index-threads. status=%d, batch-index-threads=%u" , |
1243 | status, g_config.n_batch_index_threads); |
1244 | } |
1245 | } |
1246 | } |
1247 | } |
1248 | cf_mutex_unlock(&batch_resize_lock); |
1249 | return status; |
1250 | } |
1251 | |
1252 | void |
1253 | as_batch_queues_info(cf_dyn_buf* db) |
1254 | { |
1255 | cf_mutex_lock(&batch_resize_lock); |
1256 | |
1257 | uint32_t max = batch_thread_pool.thread_size; |
1258 | |
1259 | for (uint32_t i = 0; i < max; i++) { |
1260 | if (i > 0) { |
1261 | cf_dyn_buf_append_char(db, ','); |
1262 | } |
1263 | as_batch_queue* bq = &batch_queues[i]; |
1264 | cf_dyn_buf_append_uint32(db, bq->tran_count); // Batch count |
1265 | cf_dyn_buf_append_char(db, ':'); |
1266 | cf_dyn_buf_append_int(db, cf_queue_sz(bq->response_queue)); // Buffer count |
1267 | } |
1268 | cf_mutex_unlock(&batch_resize_lock); |
1269 | } |
1270 | |
1271 | int |
1272 | as_batch_unused_buffers() |
1273 | { |
1274 | return cf_queue_sz(batch_buffer_pool.queue); |
1275 | } |
1276 | |
1277 | // Not currently called. Put in this place holder in case server decides to |
1278 | // implement clean shutdowns in the future. |
1279 | void |
1280 | as_batch_destroy() |
1281 | { |
1282 | as_thread_pool_destroy(&batch_thread_pool); |
1283 | as_buffer_pool_destroy(&batch_buffer_pool); |
1284 | |
1285 | cf_mutex_lock(&batch_resize_lock); |
1286 | as_batch_shutdown_thread_queues(0, batch_thread_pool.thread_size); |
1287 | cf_mutex_unlock(&batch_resize_lock); |
1288 | cf_mutex_destroy(&batch_resize_lock); |
1289 | } |
1290 | |
1291 | as_file_handle* |
1292 | as_batch_get_fd_h(as_batch_shared* shared) |
1293 | { |
1294 | return shared->fd_h; |
1295 | } |
1296 | |
1297 | as_msg_field* |
1298 | as_batch_get_predexp_mf(as_batch_shared* shared) |
1299 | { |
1300 | return shared->predexp_mf; |
1301 | } |
1302 | |
1303 | predexp_eval_t* |
1304 | as_batch_get_predexp(as_batch_shared* shared) |
1305 | { |
1306 | return shared->predexp; |
1307 | } |
1308 | |