1/*
2 * proto.c
3 *
4 * Copyright (C) 2008-2015 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "base/proto.h"
28
29#include <errno.h>
30#include <stdbool.h>
31#include <stddef.h>
32#include <stdint.h>
33#include <string.h>
34#include <unistd.h>
35
36#include "aerospike/as_val.h"
37#include "citrusleaf/alloc.h"
38#include "citrusleaf/cf_byte_order.h"
39#include "citrusleaf/cf_digest.h"
40#include "citrusleaf/cf_queue.h"
41#include "citrusleaf/cf_vector.h"
42
43#include "cf_thread.h"
44#include "dynbuf.h"
45#include "fault.h"
46#include "socket.h"
47
48#include "base/as_stap.h"
49#include "base/datamodel.h"
50#include "base/index.h"
51#include "base/thr_tsvc.h"
52#include "base/transaction.h"
53#include "storage/storage.h"
54
55
56//==========================================================
57// Typedefs & constants.
58//
59
60#define MSG_STACK_BUFFER_SZ (1024 * 16)
61#define NETIO_MAX_IO_RETRY 5
62
63static const char SUCCESS_BIN_NAME[] = "SUCCESS";
64static const char FAILURE_BIN_NAME[] = "FAILURE";
65
66
67//==========================================================
68// Globals.
69//
70
71static cf_queue g_netio_queue;
72static cf_queue g_netio_slow_queue;
73
74
75//==========================================================
76// Forward declarations.
77//
78
79static int send_reply_buf(as_file_handle *fd_h, uint8_t *msgp, size_t msg_sz);
80static void *run_netio(void *q_to_wait_on);
81static int netio_send_packet(as_file_handle *fd_h, cf_buf_builder *bb_r, uint32_t *offset, bool blocking);
82
83
84//==========================================================
85// Public API - byte swapping.
86//
87
88void
89as_proto_swap(as_proto *proto)
90{
91 uint8_t version = proto->version;
92 uint8_t type = proto->type;
93
94 proto->version = proto->type = 0;
95 proto->sz = cf_swap_from_be64(*(uint64_t *)proto);
96 proto->version = version;
97 proto->type = type;
98}
99
100void
101as_msg_swap_header(as_msg *m)
102{
103 m->generation = cf_swap_from_be32(m->generation);
104 m->record_ttl = cf_swap_from_be32(m->record_ttl);
105 m->transaction_ttl = cf_swap_from_be32(m->transaction_ttl);
106 m->n_fields = cf_swap_from_be16(m->n_fields);
107 m->n_ops = cf_swap_from_be16(m->n_ops);
108}
109
110void
111as_msg_swap_field(as_msg_field *mf)
112{
113 mf->field_sz = cf_swap_from_be32(mf->field_sz);
114}
115
116void
117as_msg_swap_op(as_msg_op *op)
118{
119 op->op_sz = cf_swap_from_be32(op->op_sz);
120}
121
122
123//==========================================================
124// Public API - generating internal transactions.
125//
126
127// Allocates cl_msg returned - caller must free it. Everything is host-ordered.
128// Will add more parameters (e.g. for set name) only as they become necessary.
129cl_msg *
130as_msg_create_internal(const char *ns_name, uint8_t info1, uint8_t info2,
131 uint8_t info3, uint16_t n_ops, uint8_t *ops, size_t ops_sz)
132{
133 size_t ns_name_len = strlen(ns_name);
134
135 size_t msg_sz = sizeof(cl_msg) +
136 sizeof(as_msg_field) + ns_name_len +
137 ops_sz;
138
139 cl_msg *msgp = (cl_msg *)cf_malloc(msg_sz);
140
141 msgp->proto.version = PROTO_VERSION;
142 msgp->proto.type = PROTO_TYPE_AS_MSG;
143 msgp->proto.sz = msg_sz - sizeof(as_proto);
144
145 as_msg *m = &msgp->msg;
146
147 m->header_sz = sizeof(as_msg);
148 m->info1 = info1;
149 m->info2 = info2;
150 m->info3 = info3;
151 m->unused = 0;
152 m->result_code = 0;
153 m->generation = 0;
154 m->record_ttl = 0;
155 m->transaction_ttl = 0;
156 m->n_fields = 1;
157 m->n_ops = n_ops;
158
159 as_msg_field *mf = (as_msg_field *)(m->data);
160
161 mf->type = AS_MSG_FIELD_TYPE_NAMESPACE;
162 mf->field_sz = (uint32_t)ns_name_len + 1;
163 memcpy(mf->data, ns_name, ns_name_len);
164
165 if (ops != NULL) {
166 uint8_t *msg_ops = (uint8_t *)as_msg_field_get_next(mf);
167
168 memcpy(msg_ops, ops, ops_sz);
169 }
170
171 return msgp;
172}
173
174
175//==========================================================
176// Public API - packing responses.
177//
178
179// Allocates cl_msg returned - caller must free it.
180cl_msg *
181as_msg_make_response_msg(uint32_t result_code, uint32_t generation,
182 uint32_t void_time, as_msg_op **ops, as_bin **bins, uint16_t bin_count,
183 as_namespace *ns, cl_msg *msgp_in, size_t *msg_sz_in, uint64_t trid)
184{
185 uint16_t n_fields = 0;
186 size_t msg_sz = sizeof(cl_msg);
187
188 if (trid != 0) {
189 n_fields++;
190 msg_sz += sizeof(as_msg_field) + sizeof(trid);
191 }
192
193 msg_sz += sizeof(as_msg_op) * bin_count;
194
195 for (uint16_t i = 0; i < bin_count; i++) {
196 if (ops) {
197 msg_sz += ops[i]->name_sz;
198 }
199 else if (bins[i]) {
200 msg_sz += ns->single_bin ?
201 0 : strlen(as_bin_get_name_from_id(ns, bins[i]->id));
202 }
203 else {
204 cf_crash(AS_PROTO, "making response message with null bin and op");
205 }
206
207 if (bins[i]) {
208 msg_sz += as_bin_particle_client_value_size(bins[i]);
209 }
210 }
211
212 uint8_t *buf;
213
214 if (! msgp_in || *msg_sz_in < msg_sz) {
215 buf = cf_malloc(msg_sz);
216 }
217 else {
218 buf = (uint8_t *)msgp_in;
219 }
220
221 *msg_sz_in = msg_sz;
222
223 cl_msg *msgp = (cl_msg *)buf;
224
225 msgp->proto.version = PROTO_VERSION;
226 msgp->proto.type = PROTO_TYPE_AS_MSG;
227 msgp->proto.sz = msg_sz - sizeof(as_proto);
228
229 as_proto_swap(&msgp->proto);
230
231 as_msg *m = &msgp->msg;
232
233 m->header_sz = sizeof(as_msg);
234 m->info1 = 0;
235 m->info2 = 0;
236 m->info3 = 0;
237 m->unused = 0;
238 m->result_code = result_code;
239 m->generation = generation == 0 ? 0 : plain_generation(generation, ns);
240 m->record_ttl = void_time;
241 m->transaction_ttl = 0;
242 m->n_fields = n_fields;
243 m->n_ops = bin_count;
244
245 as_msg_swap_header(m);
246
247 buf = m->data;
248
249 if (trid != 0) {
250 as_msg_field *mf = (as_msg_field *)buf;
251
252 mf->field_sz = 1 + sizeof(uint64_t);
253 mf->type = AS_MSG_FIELD_TYPE_TRID;
254 *(uint64_t *)mf->data = cf_swap_to_be64(trid);
255 as_msg_swap_field(mf);
256 buf += sizeof(as_msg_field) + sizeof(uint64_t);
257 }
258
259 for (uint16_t i = 0; i < bin_count; i++) {
260 as_msg_op *op = (as_msg_op *)buf;
261
262 op->version = 0;
263
264 if (ops) {
265 op->op = ops[i]->op;
266 memcpy(op->name, ops[i]->name, ops[i]->name_sz);
267 op->name_sz = ops[i]->name_sz;
268 }
269 else {
270 op->op = AS_MSG_OP_READ;
271 op->name_sz = as_bin_memcpy_name(ns, op->name, bins[i]);
272 }
273
274 op->op_sz = OP_FIXED_SZ + op->name_sz;
275
276 buf += sizeof(as_msg_op) + op->name_sz;
277 buf += as_bin_particle_to_client(bins[i], op);
278
279 as_msg_swap_op(op);
280 }
281
282 return msgp;
283}
284
285// Pass NULL bb_r for sizing only. Return value is size if >= 0, error if < 0.
286int32_t
287as_msg_make_response_bufbuilder(cf_buf_builder **bb_r, as_storage_rd *rd,
288 bool no_bin_data, cf_vector *select_bins)
289{
290 as_namespace *ns = rd->ns;
291 as_record *r = rd->r;
292
293 size_t ns_len = strlen(ns->name);
294 const char *set_name = as_index_get_set_name(r, ns);
295 size_t set_name_len = set_name ? strlen(set_name) : 0;
296
297 const uint8_t* key = NULL;
298 uint32_t key_size = 0;
299
300 if (r->key_stored == 1) {
301 if (! as_storage_record_get_key(rd)) {
302 cf_warning(AS_PROTO, "can't get key - skipping record");
303 return -1;
304 }
305
306 key = rd->key;
307 key_size = rd->key_size;
308 }
309
310 uint16_t n_fields = 2; // always add namespace and digest
311 size_t msg_sz = sizeof(as_msg) +
312 sizeof(as_msg_field) + ns_len +
313 sizeof(as_msg_field) + sizeof(cf_digest);
314
315 if (set_name) {
316 n_fields++;
317 msg_sz += sizeof(as_msg_field) + set_name_len;
318 }
319
320 if (key) {
321 n_fields++;
322 msg_sz += sizeof(as_msg_field) + key_size;
323 }
324
325 uint32_t n_select_bins = 0;
326 uint16_t n_bins_matched = 0;
327 uint16_t n_record_bins = 0;
328
329 if (! no_bin_data) {
330 if (select_bins) {
331 n_select_bins = cf_vector_size(select_bins);
332
333 for (uint32_t i = 0; i < n_select_bins; i++) {
334 char bin_name[AS_BIN_NAME_MAX_SZ];
335
336 cf_vector_get(select_bins, i, (void*)&bin_name);
337
338 as_bin *b = as_bin_get(rd, bin_name);
339
340 if (! b) {
341 continue;
342 }
343
344 msg_sz += sizeof(as_msg_op);
345 msg_sz += ns->single_bin ? 0 : strlen(bin_name);
346 msg_sz += as_bin_particle_client_value_size(b);
347
348 n_bins_matched++;
349 }
350
351 // Don't return an empty record.
352 if (n_bins_matched == 0) {
353 return 0;
354 }
355 }
356 else {
357 n_record_bins = as_bin_inuse_count(rd);
358
359 msg_sz += sizeof(as_msg_op) * n_record_bins;
360
361 for (uint16_t i = 0; i < n_record_bins; i++) {
362 as_bin *b = &rd->bins[i];
363
364 msg_sz += ns->single_bin ?
365 0 : strlen(as_bin_get_name_from_id(ns, b->id));
366 msg_sz += (int)as_bin_particle_client_value_size(b);
367 }
368 }
369 }
370
371 // NULL buf-builder means just return size.
372 if (! bb_r) {
373 return (int32_t)msg_sz;
374 }
375
376 uint8_t *buf;
377
378 cf_buf_builder_reserve(bb_r, (int)msg_sz, &buf);
379
380 as_msg *m = (as_msg *)buf;
381
382 m->header_sz = sizeof(as_msg);
383 m->info1 = no_bin_data ? AS_MSG_INFO1_GET_NO_BINS : 0;
384 m->info2 = 0;
385 m->info3 = 0;
386 m->unused = 0;
387 m->result_code = AS_OK;
388 m->generation = plain_generation(r->generation, ns);
389 m->record_ttl = r->void_time;
390 m->transaction_ttl = 0;
391 m->n_fields = n_fields;
392
393 if (no_bin_data) {
394 m->n_ops = 0;
395 }
396 else {
397 m->n_ops = select_bins ? n_bins_matched : n_record_bins;
398 }
399
400 as_msg_swap_header(m);
401
402 buf = m->data;
403
404 as_msg_field *mf = (as_msg_field *)buf;
405
406 mf->field_sz = ns_len + 1;
407 mf->type = AS_MSG_FIELD_TYPE_NAMESPACE;
408 memcpy(mf->data, ns->name, ns_len);
409 as_msg_swap_field(mf);
410 buf += sizeof(as_msg_field) + ns_len;
411
412 mf = (as_msg_field *)buf;
413 mf->field_sz = sizeof(cf_digest) + 1;
414 mf->type = AS_MSG_FIELD_TYPE_DIGEST_RIPE;
415 memcpy(mf->data, &r->keyd, sizeof(cf_digest));
416 as_msg_swap_field(mf);
417 buf += sizeof(as_msg_field) + sizeof(cf_digest);
418
419 if (set_name) {
420 mf = (as_msg_field *)buf;
421 mf->field_sz = set_name_len + 1;
422 mf->type = AS_MSG_FIELD_TYPE_SET;
423 memcpy(mf->data, set_name, set_name_len);
424 as_msg_swap_field(mf);
425 buf += sizeof(as_msg_field) + set_name_len;
426 }
427
428 if (key) {
429 mf = (as_msg_field *)buf;
430 mf->field_sz = key_size + 1;
431 mf->type = AS_MSG_FIELD_TYPE_KEY;
432 memcpy(mf->data, key, key_size);
433 as_msg_swap_field(mf);
434 buf += sizeof(as_msg_field) + key_size;
435 }
436
437 if (no_bin_data) {
438 return (int32_t)msg_sz;
439 }
440
441 if (select_bins) {
442 for (uint32_t i = 0; i < n_select_bins; i++) {
443 char bin_name[AS_BIN_NAME_MAX_SZ];
444
445 cf_vector_get(select_bins, i, (void*)&bin_name);
446
447 as_bin *b = as_bin_get(rd, bin_name);
448
449 if (! b) {
450 continue;
451 }
452
453 as_msg_op *op = (as_msg_op *)buf;
454
455 op->op = AS_MSG_OP_READ;
456 op->version = 0;
457 op->name_sz = as_bin_memcpy_name(ns, op->name, b);
458 op->op_sz = OP_FIXED_SZ + op->name_sz;
459
460 buf += sizeof(as_msg_op) + op->name_sz;
461 buf += as_bin_particle_to_client(b, op);
462
463 as_msg_swap_op(op);
464 }
465 }
466 else {
467 for (uint16_t i = 0; i < n_record_bins; i++) {
468 as_msg_op *op = (as_msg_op *)buf;
469
470 op->op = AS_MSG_OP_READ;
471 op->version = 0;
472 op->name_sz = as_bin_memcpy_name(ns, op->name, &rd->bins[i]);
473 op->op_sz = OP_FIXED_SZ + op->name_sz;
474
475 buf += sizeof(as_msg_op) + op->name_sz;
476 buf += as_bin_particle_to_client(&rd->bins[i], op);
477
478 as_msg_swap_op(op);
479 }
480 }
481
482 return (int32_t)msg_sz;
483}
484
485cl_msg *
486as_msg_make_val_response(bool success, const as_val *val, uint32_t result_code,
487 uint32_t generation, uint32_t void_time, uint64_t trid,
488 size_t *p_msg_sz)
489{
490 const char *bin_name;
491 size_t bin_name_len;
492
493 if (success) {
494 bin_name = SUCCESS_BIN_NAME;
495 bin_name_len = sizeof(SUCCESS_BIN_NAME) - 1;
496 }
497 else {
498 bin_name = FAILURE_BIN_NAME;
499 bin_name_len = sizeof(FAILURE_BIN_NAME) - 1;
500 }
501
502 uint16_t n_fields = 0;
503 size_t msg_sz = sizeof(cl_msg);
504
505 if (trid != 0) {
506 n_fields++;
507 msg_sz += sizeof(as_msg_field) + sizeof(trid);
508 }
509
510 msg_sz += sizeof(as_msg_op) + bin_name_len +
511 as_particle_asval_client_value_size(val);
512
513 uint8_t *buf = cf_malloc(msg_sz);
514 cl_msg *msgp = (cl_msg *)buf;
515
516 msgp->proto.version = PROTO_VERSION;
517 msgp->proto.type = PROTO_TYPE_AS_MSG;
518 msgp->proto.sz = msg_sz - sizeof(as_proto);
519
520 as_proto_swap(&msgp->proto);
521
522 as_msg *m = &msgp->msg;
523
524 m->header_sz = sizeof(as_msg);
525 m->info1 = 0;
526 m->info2 = 0;
527 m->info3 = 0;
528 m->unused = 0;
529 m->result_code = result_code;
530 m->generation = generation;
531 m->record_ttl = void_time;
532 m->transaction_ttl = 0;
533 m->n_fields = n_fields;
534 m->n_ops = 1; // only the one special bin
535
536 as_msg_swap_header(m);
537
538 buf = m->data;
539
540 if (trid != 0) {
541 as_msg_field *mf = (as_msg_field *)buf;
542
543 mf->field_sz = 1 + sizeof(uint64_t);
544 mf->type = AS_MSG_FIELD_TYPE_TRID;
545 *(uint64_t *)mf->data = cf_swap_to_be64(trid);
546 as_msg_swap_field(mf);
547 buf += sizeof(as_msg_field) + sizeof(uint64_t);
548 }
549
550 as_msg_op *op = (as_msg_op *)buf;
551
552 op->op = AS_MSG_OP_READ;
553 op->name_sz = (uint8_t)bin_name_len;
554 memcpy(op->name, bin_name, op->name_sz);
555 op->op_sz = OP_FIXED_SZ + op->name_sz;
556 op->version = 0;
557
558 as_particle_asval_to_client(val, op);
559
560 as_msg_swap_op(op);
561
562 *p_msg_sz = msg_sz;
563
564 return msgp;
565}
566
567// Caller-provided val_sz must be the result of calling
568// as_particle_asval_client_value_size() for same val.
569void
570as_msg_make_val_response_bufbuilder(const as_val *val, cf_buf_builder **bb_r,
571 uint32_t val_sz, bool success)
572{
573 const char *bin_name;
574 size_t bin_name_len;
575
576 if (success) {
577 bin_name = SUCCESS_BIN_NAME;
578 bin_name_len = sizeof(SUCCESS_BIN_NAME) - 1;
579 }
580 else {
581 bin_name = FAILURE_BIN_NAME;
582 bin_name_len = sizeof(FAILURE_BIN_NAME) - 1;
583 }
584
585 size_t msg_sz = sizeof(as_msg) + sizeof(as_msg_op) + bin_name_len + val_sz;
586
587 uint8_t *buf;
588
589 cf_buf_builder_reserve(bb_r, (int)msg_sz, &buf);
590
591 as_msg *m = (as_msg *)buf;
592
593 m->header_sz = sizeof(as_msg);
594 m->info1 = 0;
595 m->info2 = 0;
596 m->info3 = 0;
597 m->unused = 0;
598 m->result_code = AS_OK;
599 m->generation = 0;
600 m->record_ttl = 0;
601 m->transaction_ttl = 0;
602 m->n_fields = 0;
603 m->n_ops = 1; // only the one special bin
604
605 as_msg_swap_header(m);
606
607 as_msg_op *op = (as_msg_op *)m->data;
608
609 op->op = AS_MSG_OP_READ;
610 op->name_sz = (uint8_t)bin_name_len;
611 memcpy(op->name, bin_name, op->name_sz);
612 op->op_sz = OP_FIXED_SZ + op->name_sz;
613 op->version = 0;
614
615 as_particle_asval_to_client(val, op);
616
617 as_msg_swap_op(op);
618}
619
620
621//==========================================================
622// Public API - sending responses to client.
623//
624
625// Make an individual transaction response and send it.
626int
627as_msg_send_reply(as_file_handle *fd_h, uint32_t result_code,
628 uint32_t generation, uint32_t void_time, as_msg_op **ops, as_bin **bins,
629 uint16_t bin_count, as_namespace *ns, uint64_t trid)
630{
631 uint8_t stack_buf[MSG_STACK_BUFFER_SZ];
632 size_t msg_sz = sizeof(stack_buf);
633 uint8_t *msgp = (uint8_t *)as_msg_make_response_msg(result_code, generation,
634 void_time, ops, bins, bin_count, ns, (cl_msg *)stack_buf, &msg_sz,
635 trid);
636
637 int rv = send_reply_buf(fd_h, msgp, msg_sz);
638
639 if (msgp != stack_buf) {
640 cf_free(msgp);
641 }
642
643 return rv;
644}
645
646// Send a pre-made response saved in a dyn-buf.
647int
648as_msg_send_ops_reply(as_file_handle *fd_h, cf_dyn_buf *db)
649{
650 return send_reply_buf(fd_h, db->buf, db->used_sz);
651}
652
653// Send a blocking "fin" message with default timeout.
654bool
655as_msg_send_fin(cf_socket *sock, uint32_t result_code)
656{
657 return as_msg_send_fin_timeout(sock, result_code, CF_SOCKET_TIMEOUT) != 0;
658}
659
660// Send a blocking "fin" message with a specified timeout.
661size_t
662as_msg_send_fin_timeout(cf_socket *sock, uint32_t result_code, int32_t timeout)
663{
664 cl_msg msgp;
665
666 msgp.proto.version = PROTO_VERSION;
667 msgp.proto.type = PROTO_TYPE_AS_MSG;
668 msgp.proto.sz = sizeof(as_msg);
669
670 as_proto_swap(&msgp.proto);
671
672 as_msg *m = &msgp.msg;
673
674 m->header_sz = sizeof(as_msg);
675 m->info1 = 0;
676 m->info2 = 0;
677 m->info3 = AS_MSG_INFO3_LAST;
678 m->unused = 0;
679 m->result_code = result_code;
680 m->generation = 0;
681 m->record_ttl = 0;
682 m->transaction_ttl = 0;
683 m->n_fields = 0;
684 m->n_ops = 0;
685
686 as_msg_swap_header(m);
687
688 if (cf_socket_send_all(sock, (uint8_t*)&msgp, sizeof(msgp), MSG_NOSIGNAL,
689 timeout) < 0) {
690 cf_warning(AS_PROTO, "send error - fd %d %s", CSFD(sock),
691 cf_strerror(errno));
692 return 0;
693 }
694
695 return sizeof(cl_msg);
696}
697
698
699//==========================================================
700// Public API - query "net-IO" responses.
701//
702
703void
704as_netio_init()
705{
706 cf_queue_init(&g_netio_queue, sizeof(as_netio), 64, true);
707 cf_queue_init(&g_netio_slow_queue, sizeof(as_netio), 64, true);
708
709 cf_thread_create_detached(run_netio, (void *)&g_netio_queue);
710 cf_thread_create_detached(run_netio, (void *)&g_netio_slow_queue);
711}
712
713// Based on io object, send buffer to the network, or queue for retry.
714//
715// start_cb: Callback to the module before the real IO is started. Returns:
716// AS_NETIO_OK: Everything ok, go ahead with IO.
717// AS_NETIO_ERR: If there was issue like abort/err/timeout etc.
718//
719// finish_cb: Callback to module with status code of the IO call. Returns:
720// AS_NETIO_OK: Everything ok.
721// AS_NETIO_CONTINUE: The IO was requeued.
722// AS_NETIO_ERR: IO erred out due to some issue.
723//
724// finish_cb should do the needful like release ref to user data etc.
725//
726// Returns:
727// AS_NETIO_OK: Everything is fine, both start_cb & finish_cb were called.
728// AS_NETIO_ERR: Something failed either calling start_cb or while doing
729// network IO, finish_cb is called.
730//
731// This function consumes qtr reference. It calls finish_cb which releases ref
732// to qtr. In case of AS_NETIO_CONTINUE: this function also consumes bb_r and
733// ref for fd_h. The background thread is responsible for freeing up bb_r and
734// releasing ref to fd_h.
735int
736as_netio_send(as_netio *io, bool slow, bool blocking)
737{
738 int ret = io->start_cb(io, io->seq);
739
740 if (ret == AS_NETIO_OK) {
741 ret = io->finish_cb(io, netio_send_packet(io->fd_h, io->bb_r,
742 &io->offset, blocking));
743 }
744 else {
745 ret = io->finish_cb(io, ret);
746 }
747
748 // If needs requeue then requeue it.
749 switch (ret) {
750 case AS_NETIO_CONTINUE:
751 if (slow) {
752 io->slow = true;
753 cf_queue_push(&g_netio_slow_queue, io);
754 }
755 else {
756 cf_queue_push(&g_netio_queue, io);
757 }
758 break;
759 default:
760 ret = AS_NETIO_OK;
761 break;
762 }
763
764 return ret;
765}
766
767
768//==========================================================
769// Local helpers.
770//
771
772static int
773send_reply_buf(as_file_handle *fd_h, uint8_t *msgp, size_t msg_sz)
774{
775 cf_assert(cf_socket_exists(&fd_h->sock), AS_PROTO, "fd is invalid");
776
777 if (cf_socket_send_all(&fd_h->sock, msgp, msg_sz, MSG_NOSIGNAL,
778 CF_SOCKET_TIMEOUT) < 0) {
779 // Common when a client aborts.
780 cf_debug(AS_PROTO, "protocol write fail: fd %d sz %zu errno %d",
781 CSFD(&fd_h->sock), msg_sz, errno);
782
783 as_end_of_transaction_force_close(fd_h);
784 return -1;
785 }
786
787 as_end_of_transaction_ok(fd_h);
788 return 0;
789}
790
791static void *
792run_netio(void *q_to_wait_on)
793{
794 cf_queue *q = (cf_queue*)q_to_wait_on;
795
796 while (true) {
797 as_netio io;
798
799 if (cf_queue_pop(q, &io, CF_QUEUE_FOREVER) != 0) {
800 cf_crash(AS_PROTO, "failed to pop from IO worker queue.");
801 }
802
803 if (io.slow) {
804 usleep(g_config.proto_slow_netio_sleep_ms * 1000);
805 }
806
807 as_netio_send(&io, true, false);
808 }
809
810 return NULL;
811}
812
813static int
814netio_send_packet(as_file_handle *fd_h, cf_buf_builder *bb_r, uint32_t *offset,
815 bool blocking)
816{
817#if defined(USE_SYSTEMTAP)
818 uint64_t nodeid = g_config.self_node;
819#endif
820
821 uint32_t len = bb_r->used_sz;
822 uint8_t *buf = bb_r->buf;
823
824 as_proto proto;
825
826 proto.version = PROTO_VERSION;
827 proto.type = PROTO_TYPE_AS_MSG;
828 proto.sz = len - 8;
829 as_proto_swap(&proto);
830
831 memcpy(bb_r->buf, &proto, 8);
832
833 uint32_t pos = *offset;
834
835 ASD_QUERY_SENDPACKET_STARTING(nodeid, pos, len);
836
837 int retry = 0;
838
839 cf_detail(AS_PROTO," start at %p %d %d", buf, pos, len);
840
841 while (pos < len) {
842 int rv = cf_socket_send(&fd_h->sock, buf + pos, len - pos,
843 MSG_NOSIGNAL);
844
845 if (rv <= 0) {
846 if (errno != EAGAIN) {
847 cf_debug(AS_PROTO, "packet send response error returned %d errno %d fd %d",
848 rv, errno, CSFD(&fd_h->sock));
849 return AS_NETIO_IO_ERR;
850 }
851
852 if (! blocking && (retry > NETIO_MAX_IO_RETRY)) {
853 *offset = pos;
854 cf_detail(AS_PROTO," end at %p %d %d", buf, pos, len);
855 ASD_QUERY_SENDPACKET_CONTINUE(nodeid, pos);
856 return AS_NETIO_CONTINUE;
857 }
858
859 retry++;
860 // bigger packets so try few extra times
861 usleep(100);
862 }
863 else {
864 pos += rv;
865 }
866 }
867
868 ASD_QUERY_SENDPACKET_FINISHED(nodeid);
869 return AS_NETIO_OK;
870}
871