1 | /* |
2 | * proto.c |
3 | * |
4 | * Copyright (C) 2008-2015 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "base/proto.h" |
28 | |
29 | #include <errno.h> |
30 | #include <stdbool.h> |
31 | #include <stddef.h> |
32 | #include <stdint.h> |
33 | #include <string.h> |
34 | #include <unistd.h> |
35 | |
36 | #include "aerospike/as_val.h" |
37 | #include "citrusleaf/alloc.h" |
38 | #include "citrusleaf/cf_byte_order.h" |
39 | #include "citrusleaf/cf_digest.h" |
40 | #include "citrusleaf/cf_queue.h" |
41 | #include "citrusleaf/cf_vector.h" |
42 | |
43 | #include "cf_thread.h" |
44 | #include "dynbuf.h" |
45 | #include "fault.h" |
46 | #include "socket.h" |
47 | |
48 | #include "base/as_stap.h" |
49 | #include "base/datamodel.h" |
50 | #include "base/index.h" |
51 | #include "base/thr_tsvc.h" |
52 | #include "base/transaction.h" |
53 | #include "storage/storage.h" |
54 | |
55 | |
56 | //========================================================== |
57 | // Typedefs & constants. |
58 | // |
59 | |
60 | #define MSG_STACK_BUFFER_SZ (1024 * 16) |
61 | #define NETIO_MAX_IO_RETRY 5 |
62 | |
63 | static const char SUCCESS_BIN_NAME[] = "SUCCESS" ; |
64 | static const char FAILURE_BIN_NAME[] = "FAILURE" ; |
65 | |
66 | |
67 | //========================================================== |
68 | // Globals. |
69 | // |
70 | |
71 | static cf_queue g_netio_queue; |
72 | static cf_queue g_netio_slow_queue; |
73 | |
74 | |
75 | //========================================================== |
76 | // Forward declarations. |
77 | // |
78 | |
79 | static int send_reply_buf(as_file_handle *fd_h, uint8_t *msgp, size_t msg_sz); |
80 | static void *run_netio(void *q_to_wait_on); |
81 | static int netio_send_packet(as_file_handle *fd_h, cf_buf_builder *bb_r, uint32_t *offset, bool blocking); |
82 | |
83 | |
84 | //========================================================== |
85 | // Public API - byte swapping. |
86 | // |
87 | |
88 | void |
89 | as_proto_swap(as_proto *proto) |
90 | { |
91 | uint8_t version = proto->version; |
92 | uint8_t type = proto->type; |
93 | |
94 | proto->version = proto->type = 0; |
95 | proto->sz = cf_swap_from_be64(*(uint64_t *)proto); |
96 | proto->version = version; |
97 | proto->type = type; |
98 | } |
99 | |
100 | void |
101 | (as_msg *m) |
102 | { |
103 | m->generation = cf_swap_from_be32(m->generation); |
104 | m->record_ttl = cf_swap_from_be32(m->record_ttl); |
105 | m->transaction_ttl = cf_swap_from_be32(m->transaction_ttl); |
106 | m->n_fields = cf_swap_from_be16(m->n_fields); |
107 | m->n_ops = cf_swap_from_be16(m->n_ops); |
108 | } |
109 | |
110 | void |
111 | as_msg_swap_field(as_msg_field *mf) |
112 | { |
113 | mf->field_sz = cf_swap_from_be32(mf->field_sz); |
114 | } |
115 | |
116 | void |
117 | as_msg_swap_op(as_msg_op *op) |
118 | { |
119 | op->op_sz = cf_swap_from_be32(op->op_sz); |
120 | } |
121 | |
122 | |
123 | //========================================================== |
124 | // Public API - generating internal transactions. |
125 | // |
126 | |
127 | // Allocates cl_msg returned - caller must free it. Everything is host-ordered. |
128 | // Will add more parameters (e.g. for set name) only as they become necessary. |
129 | cl_msg * |
130 | as_msg_create_internal(const char *ns_name, uint8_t info1, uint8_t info2, |
131 | uint8_t info3, uint16_t n_ops, uint8_t *ops, size_t ops_sz) |
132 | { |
133 | size_t ns_name_len = strlen(ns_name); |
134 | |
135 | size_t msg_sz = sizeof(cl_msg) + |
136 | sizeof(as_msg_field) + ns_name_len + |
137 | ops_sz; |
138 | |
139 | cl_msg *msgp = (cl_msg *)cf_malloc(msg_sz); |
140 | |
141 | msgp->proto.version = PROTO_VERSION; |
142 | msgp->proto.type = PROTO_TYPE_AS_MSG; |
143 | msgp->proto.sz = msg_sz - sizeof(as_proto); |
144 | |
145 | as_msg *m = &msgp->msg; |
146 | |
147 | m->header_sz = sizeof(as_msg); |
148 | m->info1 = info1; |
149 | m->info2 = info2; |
150 | m->info3 = info3; |
151 | m->unused = 0; |
152 | m->result_code = 0; |
153 | m->generation = 0; |
154 | m->record_ttl = 0; |
155 | m->transaction_ttl = 0; |
156 | m->n_fields = 1; |
157 | m->n_ops = n_ops; |
158 | |
159 | as_msg_field *mf = (as_msg_field *)(m->data); |
160 | |
161 | mf->type = AS_MSG_FIELD_TYPE_NAMESPACE; |
162 | mf->field_sz = (uint32_t)ns_name_len + 1; |
163 | memcpy(mf->data, ns_name, ns_name_len); |
164 | |
165 | if (ops != NULL) { |
166 | uint8_t *msg_ops = (uint8_t *)as_msg_field_get_next(mf); |
167 | |
168 | memcpy(msg_ops, ops, ops_sz); |
169 | } |
170 | |
171 | return msgp; |
172 | } |
173 | |
174 | |
175 | //========================================================== |
176 | // Public API - packing responses. |
177 | // |
178 | |
179 | // Allocates cl_msg returned - caller must free it. |
180 | cl_msg * |
181 | as_msg_make_response_msg(uint32_t result_code, uint32_t generation, |
182 | uint32_t void_time, as_msg_op **ops, as_bin **bins, uint16_t bin_count, |
183 | as_namespace *ns, cl_msg *msgp_in, size_t *msg_sz_in, uint64_t trid) |
184 | { |
185 | uint16_t n_fields = 0; |
186 | size_t msg_sz = sizeof(cl_msg); |
187 | |
188 | if (trid != 0) { |
189 | n_fields++; |
190 | msg_sz += sizeof(as_msg_field) + sizeof(trid); |
191 | } |
192 | |
193 | msg_sz += sizeof(as_msg_op) * bin_count; |
194 | |
195 | for (uint16_t i = 0; i < bin_count; i++) { |
196 | if (ops) { |
197 | msg_sz += ops[i]->name_sz; |
198 | } |
199 | else if (bins[i]) { |
200 | msg_sz += ns->single_bin ? |
201 | 0 : strlen(as_bin_get_name_from_id(ns, bins[i]->id)); |
202 | } |
203 | else { |
204 | cf_crash(AS_PROTO, "making response message with null bin and op" ); |
205 | } |
206 | |
207 | if (bins[i]) { |
208 | msg_sz += as_bin_particle_client_value_size(bins[i]); |
209 | } |
210 | } |
211 | |
212 | uint8_t *buf; |
213 | |
214 | if (! msgp_in || *msg_sz_in < msg_sz) { |
215 | buf = cf_malloc(msg_sz); |
216 | } |
217 | else { |
218 | buf = (uint8_t *)msgp_in; |
219 | } |
220 | |
221 | *msg_sz_in = msg_sz; |
222 | |
223 | cl_msg *msgp = (cl_msg *)buf; |
224 | |
225 | msgp->proto.version = PROTO_VERSION; |
226 | msgp->proto.type = PROTO_TYPE_AS_MSG; |
227 | msgp->proto.sz = msg_sz - sizeof(as_proto); |
228 | |
229 | as_proto_swap(&msgp->proto); |
230 | |
231 | as_msg *m = &msgp->msg; |
232 | |
233 | m->header_sz = sizeof(as_msg); |
234 | m->info1 = 0; |
235 | m->info2 = 0; |
236 | m->info3 = 0; |
237 | m->unused = 0; |
238 | m->result_code = result_code; |
239 | m->generation = generation == 0 ? 0 : plain_generation(generation, ns); |
240 | m->record_ttl = void_time; |
241 | m->transaction_ttl = 0; |
242 | m->n_fields = n_fields; |
243 | m->n_ops = bin_count; |
244 | |
245 | as_msg_swap_header(m); |
246 | |
247 | buf = m->data; |
248 | |
249 | if (trid != 0) { |
250 | as_msg_field *mf = (as_msg_field *)buf; |
251 | |
252 | mf->field_sz = 1 + sizeof(uint64_t); |
253 | mf->type = AS_MSG_FIELD_TYPE_TRID; |
254 | *(uint64_t *)mf->data = cf_swap_to_be64(trid); |
255 | as_msg_swap_field(mf); |
256 | buf += sizeof(as_msg_field) + sizeof(uint64_t); |
257 | } |
258 | |
259 | for (uint16_t i = 0; i < bin_count; i++) { |
260 | as_msg_op *op = (as_msg_op *)buf; |
261 | |
262 | op->version = 0; |
263 | |
264 | if (ops) { |
265 | op->op = ops[i]->op; |
266 | memcpy(op->name, ops[i]->name, ops[i]->name_sz); |
267 | op->name_sz = ops[i]->name_sz; |
268 | } |
269 | else { |
270 | op->op = AS_MSG_OP_READ; |
271 | op->name_sz = as_bin_memcpy_name(ns, op->name, bins[i]); |
272 | } |
273 | |
274 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
275 | |
276 | buf += sizeof(as_msg_op) + op->name_sz; |
277 | buf += as_bin_particle_to_client(bins[i], op); |
278 | |
279 | as_msg_swap_op(op); |
280 | } |
281 | |
282 | return msgp; |
283 | } |
284 | |
285 | // Pass NULL bb_r for sizing only. Return value is size if >= 0, error if < 0. |
286 | int32_t |
287 | as_msg_make_response_bufbuilder(cf_buf_builder **bb_r, as_storage_rd *rd, |
288 | bool no_bin_data, cf_vector *select_bins) |
289 | { |
290 | as_namespace *ns = rd->ns; |
291 | as_record *r = rd->r; |
292 | |
293 | size_t ns_len = strlen(ns->name); |
294 | const char *set_name = as_index_get_set_name(r, ns); |
295 | size_t set_name_len = set_name ? strlen(set_name) : 0; |
296 | |
297 | const uint8_t* key = NULL; |
298 | uint32_t key_size = 0; |
299 | |
300 | if (r->key_stored == 1) { |
301 | if (! as_storage_record_get_key(rd)) { |
302 | cf_warning(AS_PROTO, "can't get key - skipping record" ); |
303 | return -1; |
304 | } |
305 | |
306 | key = rd->key; |
307 | key_size = rd->key_size; |
308 | } |
309 | |
310 | uint16_t n_fields = 2; // always add namespace and digest |
311 | size_t msg_sz = sizeof(as_msg) + |
312 | sizeof(as_msg_field) + ns_len + |
313 | sizeof(as_msg_field) + sizeof(cf_digest); |
314 | |
315 | if (set_name) { |
316 | n_fields++; |
317 | msg_sz += sizeof(as_msg_field) + set_name_len; |
318 | } |
319 | |
320 | if (key) { |
321 | n_fields++; |
322 | msg_sz += sizeof(as_msg_field) + key_size; |
323 | } |
324 | |
325 | uint32_t n_select_bins = 0; |
326 | uint16_t n_bins_matched = 0; |
327 | uint16_t n_record_bins = 0; |
328 | |
329 | if (! no_bin_data) { |
330 | if (select_bins) { |
331 | n_select_bins = cf_vector_size(select_bins); |
332 | |
333 | for (uint32_t i = 0; i < n_select_bins; i++) { |
334 | char bin_name[AS_BIN_NAME_MAX_SZ]; |
335 | |
336 | cf_vector_get(select_bins, i, (void*)&bin_name); |
337 | |
338 | as_bin *b = as_bin_get(rd, bin_name); |
339 | |
340 | if (! b) { |
341 | continue; |
342 | } |
343 | |
344 | msg_sz += sizeof(as_msg_op); |
345 | msg_sz += ns->single_bin ? 0 : strlen(bin_name); |
346 | msg_sz += as_bin_particle_client_value_size(b); |
347 | |
348 | n_bins_matched++; |
349 | } |
350 | |
351 | // Don't return an empty record. |
352 | if (n_bins_matched == 0) { |
353 | return 0; |
354 | } |
355 | } |
356 | else { |
357 | n_record_bins = as_bin_inuse_count(rd); |
358 | |
359 | msg_sz += sizeof(as_msg_op) * n_record_bins; |
360 | |
361 | for (uint16_t i = 0; i < n_record_bins; i++) { |
362 | as_bin *b = &rd->bins[i]; |
363 | |
364 | msg_sz += ns->single_bin ? |
365 | 0 : strlen(as_bin_get_name_from_id(ns, b->id)); |
366 | msg_sz += (int)as_bin_particle_client_value_size(b); |
367 | } |
368 | } |
369 | } |
370 | |
371 | // NULL buf-builder means just return size. |
372 | if (! bb_r) { |
373 | return (int32_t)msg_sz; |
374 | } |
375 | |
376 | uint8_t *buf; |
377 | |
378 | cf_buf_builder_reserve(bb_r, (int)msg_sz, &buf); |
379 | |
380 | as_msg *m = (as_msg *)buf; |
381 | |
382 | m->header_sz = sizeof(as_msg); |
383 | m->info1 = no_bin_data ? AS_MSG_INFO1_GET_NO_BINS : 0; |
384 | m->info2 = 0; |
385 | m->info3 = 0; |
386 | m->unused = 0; |
387 | m->result_code = AS_OK; |
388 | m->generation = plain_generation(r->generation, ns); |
389 | m->record_ttl = r->void_time; |
390 | m->transaction_ttl = 0; |
391 | m->n_fields = n_fields; |
392 | |
393 | if (no_bin_data) { |
394 | m->n_ops = 0; |
395 | } |
396 | else { |
397 | m->n_ops = select_bins ? n_bins_matched : n_record_bins; |
398 | } |
399 | |
400 | as_msg_swap_header(m); |
401 | |
402 | buf = m->data; |
403 | |
404 | as_msg_field *mf = (as_msg_field *)buf; |
405 | |
406 | mf->field_sz = ns_len + 1; |
407 | mf->type = AS_MSG_FIELD_TYPE_NAMESPACE; |
408 | memcpy(mf->data, ns->name, ns_len); |
409 | as_msg_swap_field(mf); |
410 | buf += sizeof(as_msg_field) + ns_len; |
411 | |
412 | mf = (as_msg_field *)buf; |
413 | mf->field_sz = sizeof(cf_digest) + 1; |
414 | mf->type = AS_MSG_FIELD_TYPE_DIGEST_RIPE; |
415 | memcpy(mf->data, &r->keyd, sizeof(cf_digest)); |
416 | as_msg_swap_field(mf); |
417 | buf += sizeof(as_msg_field) + sizeof(cf_digest); |
418 | |
419 | if (set_name) { |
420 | mf = (as_msg_field *)buf; |
421 | mf->field_sz = set_name_len + 1; |
422 | mf->type = AS_MSG_FIELD_TYPE_SET; |
423 | memcpy(mf->data, set_name, set_name_len); |
424 | as_msg_swap_field(mf); |
425 | buf += sizeof(as_msg_field) + set_name_len; |
426 | } |
427 | |
428 | if (key) { |
429 | mf = (as_msg_field *)buf; |
430 | mf->field_sz = key_size + 1; |
431 | mf->type = AS_MSG_FIELD_TYPE_KEY; |
432 | memcpy(mf->data, key, key_size); |
433 | as_msg_swap_field(mf); |
434 | buf += sizeof(as_msg_field) + key_size; |
435 | } |
436 | |
437 | if (no_bin_data) { |
438 | return (int32_t)msg_sz; |
439 | } |
440 | |
441 | if (select_bins) { |
442 | for (uint32_t i = 0; i < n_select_bins; i++) { |
443 | char bin_name[AS_BIN_NAME_MAX_SZ]; |
444 | |
445 | cf_vector_get(select_bins, i, (void*)&bin_name); |
446 | |
447 | as_bin *b = as_bin_get(rd, bin_name); |
448 | |
449 | if (! b) { |
450 | continue; |
451 | } |
452 | |
453 | as_msg_op *op = (as_msg_op *)buf; |
454 | |
455 | op->op = AS_MSG_OP_READ; |
456 | op->version = 0; |
457 | op->name_sz = as_bin_memcpy_name(ns, op->name, b); |
458 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
459 | |
460 | buf += sizeof(as_msg_op) + op->name_sz; |
461 | buf += as_bin_particle_to_client(b, op); |
462 | |
463 | as_msg_swap_op(op); |
464 | } |
465 | } |
466 | else { |
467 | for (uint16_t i = 0; i < n_record_bins; i++) { |
468 | as_msg_op *op = (as_msg_op *)buf; |
469 | |
470 | op->op = AS_MSG_OP_READ; |
471 | op->version = 0; |
472 | op->name_sz = as_bin_memcpy_name(ns, op->name, &rd->bins[i]); |
473 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
474 | |
475 | buf += sizeof(as_msg_op) + op->name_sz; |
476 | buf += as_bin_particle_to_client(&rd->bins[i], op); |
477 | |
478 | as_msg_swap_op(op); |
479 | } |
480 | } |
481 | |
482 | return (int32_t)msg_sz; |
483 | } |
484 | |
485 | cl_msg * |
486 | as_msg_make_val_response(bool success, const as_val *val, uint32_t result_code, |
487 | uint32_t generation, uint32_t void_time, uint64_t trid, |
488 | size_t *p_msg_sz) |
489 | { |
490 | const char *bin_name; |
491 | size_t bin_name_len; |
492 | |
493 | if (success) { |
494 | bin_name = SUCCESS_BIN_NAME; |
495 | bin_name_len = sizeof(SUCCESS_BIN_NAME) - 1; |
496 | } |
497 | else { |
498 | bin_name = FAILURE_BIN_NAME; |
499 | bin_name_len = sizeof(FAILURE_BIN_NAME) - 1; |
500 | } |
501 | |
502 | uint16_t n_fields = 0; |
503 | size_t msg_sz = sizeof(cl_msg); |
504 | |
505 | if (trid != 0) { |
506 | n_fields++; |
507 | msg_sz += sizeof(as_msg_field) + sizeof(trid); |
508 | } |
509 | |
510 | msg_sz += sizeof(as_msg_op) + bin_name_len + |
511 | as_particle_asval_client_value_size(val); |
512 | |
513 | uint8_t *buf = cf_malloc(msg_sz); |
514 | cl_msg *msgp = (cl_msg *)buf; |
515 | |
516 | msgp->proto.version = PROTO_VERSION; |
517 | msgp->proto.type = PROTO_TYPE_AS_MSG; |
518 | msgp->proto.sz = msg_sz - sizeof(as_proto); |
519 | |
520 | as_proto_swap(&msgp->proto); |
521 | |
522 | as_msg *m = &msgp->msg; |
523 | |
524 | m->header_sz = sizeof(as_msg); |
525 | m->info1 = 0; |
526 | m->info2 = 0; |
527 | m->info3 = 0; |
528 | m->unused = 0; |
529 | m->result_code = result_code; |
530 | m->generation = generation; |
531 | m->record_ttl = void_time; |
532 | m->transaction_ttl = 0; |
533 | m->n_fields = n_fields; |
534 | m->n_ops = 1; // only the one special bin |
535 | |
536 | as_msg_swap_header(m); |
537 | |
538 | buf = m->data; |
539 | |
540 | if (trid != 0) { |
541 | as_msg_field *mf = (as_msg_field *)buf; |
542 | |
543 | mf->field_sz = 1 + sizeof(uint64_t); |
544 | mf->type = AS_MSG_FIELD_TYPE_TRID; |
545 | *(uint64_t *)mf->data = cf_swap_to_be64(trid); |
546 | as_msg_swap_field(mf); |
547 | buf += sizeof(as_msg_field) + sizeof(uint64_t); |
548 | } |
549 | |
550 | as_msg_op *op = (as_msg_op *)buf; |
551 | |
552 | op->op = AS_MSG_OP_READ; |
553 | op->name_sz = (uint8_t)bin_name_len; |
554 | memcpy(op->name, bin_name, op->name_sz); |
555 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
556 | op->version = 0; |
557 | |
558 | as_particle_asval_to_client(val, op); |
559 | |
560 | as_msg_swap_op(op); |
561 | |
562 | *p_msg_sz = msg_sz; |
563 | |
564 | return msgp; |
565 | } |
566 | |
567 | // Caller-provided val_sz must be the result of calling |
568 | // as_particle_asval_client_value_size() for same val. |
569 | void |
570 | as_msg_make_val_response_bufbuilder(const as_val *val, cf_buf_builder **bb_r, |
571 | uint32_t val_sz, bool success) |
572 | { |
573 | const char *bin_name; |
574 | size_t bin_name_len; |
575 | |
576 | if (success) { |
577 | bin_name = SUCCESS_BIN_NAME; |
578 | bin_name_len = sizeof(SUCCESS_BIN_NAME) - 1; |
579 | } |
580 | else { |
581 | bin_name = FAILURE_BIN_NAME; |
582 | bin_name_len = sizeof(FAILURE_BIN_NAME) - 1; |
583 | } |
584 | |
585 | size_t msg_sz = sizeof(as_msg) + sizeof(as_msg_op) + bin_name_len + val_sz; |
586 | |
587 | uint8_t *buf; |
588 | |
589 | cf_buf_builder_reserve(bb_r, (int)msg_sz, &buf); |
590 | |
591 | as_msg *m = (as_msg *)buf; |
592 | |
593 | m->header_sz = sizeof(as_msg); |
594 | m->info1 = 0; |
595 | m->info2 = 0; |
596 | m->info3 = 0; |
597 | m->unused = 0; |
598 | m->result_code = AS_OK; |
599 | m->generation = 0; |
600 | m->record_ttl = 0; |
601 | m->transaction_ttl = 0; |
602 | m->n_fields = 0; |
603 | m->n_ops = 1; // only the one special bin |
604 | |
605 | as_msg_swap_header(m); |
606 | |
607 | as_msg_op *op = (as_msg_op *)m->data; |
608 | |
609 | op->op = AS_MSG_OP_READ; |
610 | op->name_sz = (uint8_t)bin_name_len; |
611 | memcpy(op->name, bin_name, op->name_sz); |
612 | op->op_sz = OP_FIXED_SZ + op->name_sz; |
613 | op->version = 0; |
614 | |
615 | as_particle_asval_to_client(val, op); |
616 | |
617 | as_msg_swap_op(op); |
618 | } |
619 | |
620 | |
621 | //========================================================== |
622 | // Public API - sending responses to client. |
623 | // |
624 | |
625 | // Make an individual transaction response and send it. |
626 | int |
627 | as_msg_send_reply(as_file_handle *fd_h, uint32_t result_code, |
628 | uint32_t generation, uint32_t void_time, as_msg_op **ops, as_bin **bins, |
629 | uint16_t bin_count, as_namespace *ns, uint64_t trid) |
630 | { |
631 | uint8_t stack_buf[MSG_STACK_BUFFER_SZ]; |
632 | size_t msg_sz = sizeof(stack_buf); |
633 | uint8_t *msgp = (uint8_t *)as_msg_make_response_msg(result_code, generation, |
634 | void_time, ops, bins, bin_count, ns, (cl_msg *)stack_buf, &msg_sz, |
635 | trid); |
636 | |
637 | int rv = send_reply_buf(fd_h, msgp, msg_sz); |
638 | |
639 | if (msgp != stack_buf) { |
640 | cf_free(msgp); |
641 | } |
642 | |
643 | return rv; |
644 | } |
645 | |
646 | // Send a pre-made response saved in a dyn-buf. |
647 | int |
648 | as_msg_send_ops_reply(as_file_handle *fd_h, cf_dyn_buf *db) |
649 | { |
650 | return send_reply_buf(fd_h, db->buf, db->used_sz); |
651 | } |
652 | |
653 | // Send a blocking "fin" message with default timeout. |
654 | bool |
655 | as_msg_send_fin(cf_socket *sock, uint32_t result_code) |
656 | { |
657 | return as_msg_send_fin_timeout(sock, result_code, CF_SOCKET_TIMEOUT) != 0; |
658 | } |
659 | |
660 | // Send a blocking "fin" message with a specified timeout. |
661 | size_t |
662 | as_msg_send_fin_timeout(cf_socket *sock, uint32_t result_code, int32_t timeout) |
663 | { |
664 | cl_msg msgp; |
665 | |
666 | msgp.proto.version = PROTO_VERSION; |
667 | msgp.proto.type = PROTO_TYPE_AS_MSG; |
668 | msgp.proto.sz = sizeof(as_msg); |
669 | |
670 | as_proto_swap(&msgp.proto); |
671 | |
672 | as_msg *m = &msgp.msg; |
673 | |
674 | m->header_sz = sizeof(as_msg); |
675 | m->info1 = 0; |
676 | m->info2 = 0; |
677 | m->info3 = AS_MSG_INFO3_LAST; |
678 | m->unused = 0; |
679 | m->result_code = result_code; |
680 | m->generation = 0; |
681 | m->record_ttl = 0; |
682 | m->transaction_ttl = 0; |
683 | m->n_fields = 0; |
684 | m->n_ops = 0; |
685 | |
686 | as_msg_swap_header(m); |
687 | |
688 | if (cf_socket_send_all(sock, (uint8_t*)&msgp, sizeof(msgp), MSG_NOSIGNAL, |
689 | timeout) < 0) { |
690 | cf_warning(AS_PROTO, "send error - fd %d %s" , CSFD(sock), |
691 | cf_strerror(errno)); |
692 | return 0; |
693 | } |
694 | |
695 | return sizeof(cl_msg); |
696 | } |
697 | |
698 | |
699 | //========================================================== |
700 | // Public API - query "net-IO" responses. |
701 | // |
702 | |
703 | void |
704 | as_netio_init() |
705 | { |
706 | cf_queue_init(&g_netio_queue, sizeof(as_netio), 64, true); |
707 | cf_queue_init(&g_netio_slow_queue, sizeof(as_netio), 64, true); |
708 | |
709 | cf_thread_create_detached(run_netio, (void *)&g_netio_queue); |
710 | cf_thread_create_detached(run_netio, (void *)&g_netio_slow_queue); |
711 | } |
712 | |
713 | // Based on io object, send buffer to the network, or queue for retry. |
714 | // |
715 | // start_cb: Callback to the module before the real IO is started. Returns: |
716 | // AS_NETIO_OK: Everything ok, go ahead with IO. |
717 | // AS_NETIO_ERR: If there was issue like abort/err/timeout etc. |
718 | // |
719 | // finish_cb: Callback to module with status code of the IO call. Returns: |
720 | // AS_NETIO_OK: Everything ok. |
721 | // AS_NETIO_CONTINUE: The IO was requeued. |
722 | // AS_NETIO_ERR: IO erred out due to some issue. |
723 | // |
724 | // finish_cb should do the needful like release ref to user data etc. |
725 | // |
726 | // Returns: |
727 | // AS_NETIO_OK: Everything is fine, both start_cb & finish_cb were called. |
728 | // AS_NETIO_ERR: Something failed either calling start_cb or while doing |
729 | // network IO, finish_cb is called. |
730 | // |
731 | // This function consumes qtr reference. It calls finish_cb which releases ref |
732 | // to qtr. In case of AS_NETIO_CONTINUE: this function also consumes bb_r and |
733 | // ref for fd_h. The background thread is responsible for freeing up bb_r and |
734 | // releasing ref to fd_h. |
735 | int |
736 | as_netio_send(as_netio *io, bool slow, bool blocking) |
737 | { |
738 | int ret = io->start_cb(io, io->seq); |
739 | |
740 | if (ret == AS_NETIO_OK) { |
741 | ret = io->finish_cb(io, netio_send_packet(io->fd_h, io->bb_r, |
742 | &io->offset, blocking)); |
743 | } |
744 | else { |
745 | ret = io->finish_cb(io, ret); |
746 | } |
747 | |
748 | // If needs requeue then requeue it. |
749 | switch (ret) { |
750 | case AS_NETIO_CONTINUE: |
751 | if (slow) { |
752 | io->slow = true; |
753 | cf_queue_push(&g_netio_slow_queue, io); |
754 | } |
755 | else { |
756 | cf_queue_push(&g_netio_queue, io); |
757 | } |
758 | break; |
759 | default: |
760 | ret = AS_NETIO_OK; |
761 | break; |
762 | } |
763 | |
764 | return ret; |
765 | } |
766 | |
767 | |
768 | //========================================================== |
769 | // Local helpers. |
770 | // |
771 | |
772 | static int |
773 | send_reply_buf(as_file_handle *fd_h, uint8_t *msgp, size_t msg_sz) |
774 | { |
775 | cf_assert(cf_socket_exists(&fd_h->sock), AS_PROTO, "fd is invalid" ); |
776 | |
777 | if (cf_socket_send_all(&fd_h->sock, msgp, msg_sz, MSG_NOSIGNAL, |
778 | CF_SOCKET_TIMEOUT) < 0) { |
779 | // Common when a client aborts. |
780 | cf_debug(AS_PROTO, "protocol write fail: fd %d sz %zu errno %d" , |
781 | CSFD(&fd_h->sock), msg_sz, errno); |
782 | |
783 | as_end_of_transaction_force_close(fd_h); |
784 | return -1; |
785 | } |
786 | |
787 | as_end_of_transaction_ok(fd_h); |
788 | return 0; |
789 | } |
790 | |
791 | static void * |
792 | run_netio(void *q_to_wait_on) |
793 | { |
794 | cf_queue *q = (cf_queue*)q_to_wait_on; |
795 | |
796 | while (true) { |
797 | as_netio io; |
798 | |
799 | if (cf_queue_pop(q, &io, CF_QUEUE_FOREVER) != 0) { |
800 | cf_crash(AS_PROTO, "failed to pop from IO worker queue." ); |
801 | } |
802 | |
803 | if (io.slow) { |
804 | usleep(g_config.proto_slow_netio_sleep_ms * 1000); |
805 | } |
806 | |
807 | as_netio_send(&io, true, false); |
808 | } |
809 | |
810 | return NULL; |
811 | } |
812 | |
813 | static int |
814 | netio_send_packet(as_file_handle *fd_h, cf_buf_builder *bb_r, uint32_t *offset, |
815 | bool blocking) |
816 | { |
817 | #if defined(USE_SYSTEMTAP) |
818 | uint64_t nodeid = g_config.self_node; |
819 | #endif |
820 | |
821 | uint32_t len = bb_r->used_sz; |
822 | uint8_t *buf = bb_r->buf; |
823 | |
824 | as_proto proto; |
825 | |
826 | proto.version = PROTO_VERSION; |
827 | proto.type = PROTO_TYPE_AS_MSG; |
828 | proto.sz = len - 8; |
829 | as_proto_swap(&proto); |
830 | |
831 | memcpy(bb_r->buf, &proto, 8); |
832 | |
833 | uint32_t pos = *offset; |
834 | |
835 | ASD_QUERY_SENDPACKET_STARTING(nodeid, pos, len); |
836 | |
837 | int retry = 0; |
838 | |
839 | cf_detail(AS_PROTO," start at %p %d %d" , buf, pos, len); |
840 | |
841 | while (pos < len) { |
842 | int rv = cf_socket_send(&fd_h->sock, buf + pos, len - pos, |
843 | MSG_NOSIGNAL); |
844 | |
845 | if (rv <= 0) { |
846 | if (errno != EAGAIN) { |
847 | cf_debug(AS_PROTO, "packet send response error returned %d errno %d fd %d" , |
848 | rv, errno, CSFD(&fd_h->sock)); |
849 | return AS_NETIO_IO_ERR; |
850 | } |
851 | |
852 | if (! blocking && (retry > NETIO_MAX_IO_RETRY)) { |
853 | *offset = pos; |
854 | cf_detail(AS_PROTO," end at %p %d %d" , buf, pos, len); |
855 | ASD_QUERY_SENDPACKET_CONTINUE(nodeid, pos); |
856 | return AS_NETIO_CONTINUE; |
857 | } |
858 | |
859 | retry++; |
860 | // bigger packets so try few extra times |
861 | usleep(100); |
862 | } |
863 | else { |
864 | pos += rv; |
865 | } |
866 | } |
867 | |
868 | ASD_QUERY_SENDPACKET_FINISHED(nodeid); |
869 | return AS_NETIO_OK; |
870 | } |
871 | |