1 | /* |
2 | * proxy.c |
3 | * |
4 | * Copyright (C) 2016 Aerospike, Inc. |
5 | * |
6 | * Portions may be licensed to Aerospike, Inc. under one or more contributor |
7 | * license agreements. |
8 | * |
9 | * This program is free software: you can redistribute it and/or modify it under |
10 | * the terms of the GNU Affero General Public License as published by the Free |
11 | * Software Foundation, either version 3 of the License, or (at your option) any |
12 | * later version. |
13 | * |
14 | * This program is distributed in the hope that it will be useful, but WITHOUT |
15 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
16 | * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more |
17 | * details. |
18 | * |
19 | * You should have received a copy of the GNU Affero General Public License |
20 | * along with this program. If not, see http://www.gnu.org/licenses/ |
21 | */ |
22 | |
23 | //========================================================== |
24 | // Includes. |
25 | // |
26 | |
27 | #include "transaction/proxy.h" |
28 | |
29 | #include <errno.h> |
30 | #include <stdbool.h> |
31 | #include <stddef.h> |
32 | #include <stdint.h> |
33 | #include <unistd.h> |
34 | |
35 | #include "citrusleaf/alloc.h" |
36 | #include "citrusleaf/cf_atomic.h" |
37 | #include "citrusleaf/cf_clock.h" |
38 | #include "citrusleaf/cf_digest.h" |
39 | |
40 | #include "cf_mutex.h" |
41 | #include "cf_thread.h" |
42 | #include "dynbuf.h" |
43 | #include "fault.h" |
44 | #include "msg.h" |
45 | #include "node.h" |
46 | #include "shash.h" |
47 | #include "socket.h" |
48 | |
49 | #include "base/batch.h" |
50 | #include "base/datamodel.h" |
51 | #include "base/health.h" |
52 | #include "base/proto.h" |
53 | #include "base/service.h" |
54 | #include "base/stats.h" |
55 | #include "base/transaction.h" |
56 | #include "fabric/exchange.h" |
57 | #include "fabric/fabric.h" |
58 | #include "fabric/partition.h" |
59 | #include "transaction/rw_request.h" |
60 | #include "transaction/rw_request_hash.h" |
61 | #include "transaction/rw_utils.h" |
62 | #include "transaction/udf.h" |
63 | |
64 | |
65 | //========================================================== |
66 | // Typedefs & constants. |
67 | // |
68 | |
69 | typedef enum { |
70 | // These values go on the wire, so mind backward compatibility if changing. |
71 | PROXY_FIELD_OP, |
72 | PROXY_FIELD_TID, |
73 | PROXY_FIELD_DIGEST, |
74 | PROXY_FIELD_REDIRECT, |
75 | PROXY_FIELD_AS_PROTO, // request as_proto - currently contains only as_msg's |
76 | PROXY_FIELD_UNUSED_5, |
77 | PROXY_FIELD_UNUSED_6, |
78 | PROXY_FIELD_UNUSED_7, |
79 | |
80 | NUM_PROXY_FIELDS |
81 | } proxy_msg_field; |
82 | |
83 | #define PROXY_OP_REQUEST 1 |
84 | #define PROXY_OP_RESPONSE 2 |
85 | #define PROXY_OP_RETURN_TO_SENDER 3 |
86 | |
87 | const msg_template proxy_mt[] = { |
88 | { PROXY_FIELD_OP, M_FT_UINT32 }, |
89 | { PROXY_FIELD_TID, M_FT_UINT32 }, |
90 | { PROXY_FIELD_DIGEST, M_FT_BUF }, |
91 | { PROXY_FIELD_REDIRECT, M_FT_UINT64 }, |
92 | { PROXY_FIELD_AS_PROTO, M_FT_BUF }, |
93 | { PROXY_FIELD_UNUSED_5, M_FT_UINT64 }, |
94 | { PROXY_FIELD_UNUSED_6, M_FT_UINT32 }, |
95 | { PROXY_FIELD_UNUSED_7, M_FT_UINT32 }, |
96 | }; |
97 | |
98 | COMPILER_ASSERT(sizeof(proxy_mt) / sizeof(msg_template) == NUM_PROXY_FIELDS); |
99 | |
100 | #define PROXY_MSG_SCRATCH_SIZE 128 |
101 | |
102 | typedef struct proxy_request_s { |
103 | uint32_t msg_fields; |
104 | |
105 | uint8_t origin; |
106 | uint8_t from_flags; |
107 | |
108 | union { |
109 | void* any; |
110 | as_file_handle* proto_fd_h; |
111 | as_batch_shared* batch_shared; |
112 | // No need yet for other members of this union. |
113 | } from; |
114 | |
115 | // No need yet for a 'from_data" union. |
116 | uint32_t batch_index; |
117 | |
118 | uint64_t start_time; |
119 | uint64_t end_time; |
120 | |
121 | // The original proxy message. |
122 | msg* fab_msg; |
123 | |
124 | as_namespace* ns; |
125 | } proxy_request; |
126 | |
127 | |
128 | //========================================================== |
129 | // Globals. |
130 | // |
131 | |
132 | static cf_shash* g_proxy_hash = NULL; |
133 | static cf_atomic32 g_proxy_tid = 0; |
134 | |
135 | |
136 | //========================================================== |
137 | // Forward declarations. |
138 | // |
139 | |
140 | void* run_proxy_timeout(void* arg); |
141 | int proxy_timeout_reduce_fn(const void* key, void* data, void* udata); |
142 | |
143 | int proxy_msg_cb(cf_node src, msg* m, void* udata); |
144 | |
145 | cl_msg* new_msg_w_extra_field(const cl_msg* msgp, const as_msg_field* f); |
146 | void proxyer_handle_response(msg* m, uint32_t tid); |
147 | int proxyer_handle_client_response(msg* m, proxy_request* pr); |
148 | int proxyer_handle_batch_response(msg* m, proxy_request* pr); |
149 | void proxyer_handle_return_to_sender(msg* m, uint32_t tid); |
150 | |
151 | void proxyee_handle_request(cf_node src, msg* m, uint32_t tid); |
152 | |
153 | |
154 | //========================================================== |
155 | // Inlines & macros. |
156 | // |
157 | |
158 | static inline void |
159 | error_response(cf_node src, uint32_t tid, uint32_t error) |
160 | { |
161 | as_proxy_send_response(src, tid, error, 0, 0, NULL, NULL, 0, NULL, 0); |
162 | } |
163 | |
164 | static inline void |
165 | client_proxy_update_stats(as_namespace* ns, uint8_t result_code) |
166 | { |
167 | switch (result_code) { |
168 | case AS_OK: |
169 | cf_atomic64_incr(&ns->n_client_proxy_complete); |
170 | break; |
171 | case AS_ERR_TIMEOUT: |
172 | cf_atomic64_incr(&ns->n_client_proxy_timeout); |
173 | break; |
174 | default: |
175 | cf_atomic64_incr(&ns->n_client_proxy_error); |
176 | break; |
177 | } |
178 | } |
179 | |
180 | static inline void |
181 | batch_sub_proxy_update_stats(as_namespace* ns, uint8_t result_code) |
182 | { |
183 | switch (result_code) { |
184 | case AS_OK: |
185 | cf_atomic64_incr(&ns->n_batch_sub_proxy_complete); |
186 | break; |
187 | case AS_ERR_TIMEOUT: |
188 | cf_atomic64_incr(&ns->n_batch_sub_proxy_timeout); |
189 | break; |
190 | default: |
191 | cf_atomic64_incr(&ns->n_batch_sub_proxy_error); |
192 | break; |
193 | } |
194 | } |
195 | |
196 | |
197 | //========================================================== |
198 | // Public API. |
199 | // |
200 | |
201 | void |
202 | as_proxy_init() |
203 | { |
204 | g_proxy_hash = cf_shash_create(cf_shash_fn_u32, sizeof(uint32_t), |
205 | sizeof(proxy_request), 4 * 1024, CF_SHASH_MANY_LOCK); |
206 | |
207 | cf_thread_create_detached(run_proxy_timeout, NULL); |
208 | |
209 | as_fabric_register_msg_fn(M_TYPE_PROXY, proxy_mt, sizeof(proxy_mt), |
210 | PROXY_MSG_SCRATCH_SIZE, proxy_msg_cb, NULL); |
211 | } |
212 | |
213 | |
214 | uint32_t |
215 | as_proxy_hash_count() |
216 | { |
217 | return cf_shash_get_size(g_proxy_hash); |
218 | } |
219 | |
220 | |
221 | // Proxyer - divert a transaction request to another node. |
222 | void |
223 | as_proxy_divert(cf_node dst, as_transaction* tr, as_namespace* ns) |
224 | { |
225 | // Special log detail. |
226 | switch (tr->origin) { |
227 | case FROM_CLIENT: |
228 | cf_detail_digest(AS_PROXY_DIVERT, &tr->keyd, |
229 | "{%s} diverting from client %s to node %lx " , |
230 | ns->name, tr->from.proto_fd_h->client, dst); |
231 | break; |
232 | case FROM_BATCH: |
233 | cf_detail_digest(AS_PROXY_DIVERT, &tr->keyd, |
234 | "{%s} diverting batch-sub from client %s to node %lx " , |
235 | ns->name, as_batch_get_fd_h(tr->from.batch_shared)->client, |
236 | dst); |
237 | break; |
238 | default: |
239 | cf_crash(AS_PROXY, "unexpected transaction origin %u" , tr->origin); |
240 | break; |
241 | } |
242 | |
243 | // Get a fabric message and fill it out. |
244 | |
245 | msg* m = as_fabric_msg_get(M_TYPE_PROXY); |
246 | |
247 | uint32_t tid = cf_atomic32_incr(&g_proxy_tid); |
248 | |
249 | msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_REQUEST); |
250 | msg_set_uint32(m, PROXY_FIELD_TID, tid); |
251 | msg_set_buf(m, PROXY_FIELD_DIGEST, (void*)&tr->keyd, sizeof(cf_digest), |
252 | MSG_SET_COPY); |
253 | |
254 | if (tr->origin == FROM_BATCH) { |
255 | as_msg_field* f = as_batch_get_predexp_mf(tr->from.batch_shared); |
256 | |
257 | if (f == NULL) { |
258 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, (void*)tr->msgp, |
259 | sizeof(as_proto) + tr->msgp->proto.sz, MSG_SET_COPY); |
260 | } |
261 | else { |
262 | cl_msg* msgp = new_msg_w_extra_field(tr->msgp, f); |
263 | |
264 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, (void*)msgp, |
265 | sizeof(as_proto) + msgp->proto.sz, MSG_SET_HANDOFF_MALLOC); |
266 | } |
267 | } |
268 | else { |
269 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, (void*)tr->msgp, |
270 | sizeof(as_proto) + tr->msgp->proto.sz, MSG_SET_HANDOFF_MALLOC); |
271 | } |
272 | |
273 | // Set up a proxy_request and insert it in the hash. |
274 | |
275 | proxy_request pr; |
276 | |
277 | pr.msg_fields = tr->msg_fields; |
278 | |
279 | pr.origin = tr->origin; |
280 | pr.from_flags = tr->from_flags; |
281 | pr.from.any = tr->from.any; |
282 | pr.batch_index = tr->from_data.batch_index; |
283 | |
284 | pr.start_time = tr->start_time; |
285 | pr.end_time = tr->end_time; |
286 | |
287 | pr.fab_msg = m; |
288 | |
289 | pr.ns = ns; |
290 | |
291 | msg_incr_ref(m); // reference for the hash |
292 | |
293 | cf_shash_put(g_proxy_hash, &tid, &pr); |
294 | |
295 | tr->msgp = NULL; // pattern, not needed |
296 | tr->from.any = NULL; // pattern, not needed |
297 | |
298 | // Send fabric message to remote node. |
299 | |
300 | if (as_fabric_send(dst, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) { |
301 | as_fabric_msg_put(m); |
302 | } |
303 | |
304 | as_health_add_node_counter(dst, AS_HEALTH_NODE_PROXIES); |
305 | } |
306 | |
307 | |
308 | // Proxyee - transaction reservation failed here, tell proxyer to try again. |
309 | void |
310 | as_proxy_return_to_sender(const as_transaction* tr, as_namespace* ns) |
311 | { |
312 | msg* m = as_fabric_msg_get(M_TYPE_PROXY); |
313 | uint32_t pid = as_partition_getid(&tr->keyd); |
314 | cf_node redirect_node = as_partition_proxyee_redirect(ns, pid); |
315 | |
316 | msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_RETURN_TO_SENDER); |
317 | msg_set_uint32(m, PROXY_FIELD_TID, tr->from_data.proxy_tid); |
318 | msg_set_uint64(m, PROXY_FIELD_REDIRECT, |
319 | redirect_node == (cf_node)0 ? tr->from.proxy_node : redirect_node); |
320 | |
321 | if (as_fabric_send(tr->from.proxy_node, m, AS_FABRIC_CHANNEL_RW) != |
322 | AS_FABRIC_SUCCESS) { |
323 | as_fabric_msg_put(m); |
324 | } |
325 | } |
326 | |
327 | |
328 | // Proxyee - transaction completed here, send response to proxyer. |
329 | void |
330 | as_proxy_send_response(cf_node dst, uint32_t proxy_tid, uint32_t result_code, |
331 | uint32_t generation, uint32_t void_time, as_msg_op** ops, as_bin** bins, |
332 | uint16_t bin_count, as_namespace* ns, uint64_t trid) |
333 | { |
334 | msg* m = as_fabric_msg_get(M_TYPE_PROXY); |
335 | |
336 | msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_RESPONSE); |
337 | msg_set_uint32(m, PROXY_FIELD_TID, proxy_tid); |
338 | |
339 | size_t msg_sz = 0; |
340 | uint8_t* msgp = (uint8_t*)as_msg_make_response_msg(result_code, generation, |
341 | void_time, ops, bins, bin_count, ns, 0, &msg_sz, trid); |
342 | |
343 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, msgp, msg_sz, MSG_SET_HANDOFF_MALLOC); |
344 | |
345 | if (as_fabric_send(dst, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) { |
346 | as_fabric_msg_put(m); |
347 | } |
348 | } |
349 | |
350 | |
351 | // Proxyee - transaction completed here, send response to proxyer. |
352 | void |
353 | as_proxy_send_ops_response(cf_node dst, uint32_t proxy_tid, cf_dyn_buf* db) |
354 | { |
355 | msg* m = as_fabric_msg_get(M_TYPE_PROXY); |
356 | |
357 | msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_RESPONSE); |
358 | msg_set_uint32(m, PROXY_FIELD_TID, proxy_tid); |
359 | |
360 | uint8_t* msgp = db->buf; |
361 | size_t msg_sz = db->used_sz; |
362 | |
363 | if (db->is_stack) { |
364 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, msgp, msg_sz, MSG_SET_COPY); |
365 | } |
366 | else { |
367 | msg_set_buf(m, PROXY_FIELD_AS_PROTO, msgp, msg_sz, |
368 | MSG_SET_HANDOFF_MALLOC); |
369 | db->buf = NULL; // the fabric owns the buffer now |
370 | } |
371 | |
372 | if (as_fabric_send(dst, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) { |
373 | as_fabric_msg_put(m); |
374 | } |
375 | } |
376 | |
377 | |
378 | //========================================================== |
379 | // Local helpers - proxyer. |
380 | // |
381 | |
382 | cl_msg* |
383 | (const cl_msg* msgp, const as_msg_field* f) |
384 | { |
385 | size_t old_sz = sizeof(as_proto) + msgp->proto.sz; |
386 | size_t = sizeof(f->field_sz) + f->field_sz; |
387 | cl_msg* new_msgp = cf_malloc(old_sz + extra_sz); |
388 | |
389 | memcpy(new_msgp, msgp, old_sz); |
390 | memcpy((uint8_t*)new_msgp + old_sz, f, extra_sz); |
391 | |
392 | new_msgp->proto.sz += extra_sz; |
393 | new_msgp->msg.n_fields++; |
394 | |
395 | return new_msgp; |
396 | } |
397 | |
398 | void |
399 | proxyer_handle_response(msg* m, uint32_t tid) |
400 | { |
401 | proxy_request pr; |
402 | |
403 | if (cf_shash_get_and_delete(g_proxy_hash, &tid, &pr) != CF_SHASH_OK) { |
404 | // Some other response (or timeout) has already finished this pr. |
405 | return; |
406 | } |
407 | |
408 | cf_assert(pr.from.any, AS_PROXY, "origin %u has null 'from'" , pr.origin); |
409 | |
410 | int result; |
411 | |
412 | switch (pr.origin) { |
413 | case FROM_CLIENT: |
414 | result = proxyer_handle_client_response(m, &pr); |
415 | client_proxy_update_stats(pr.ns, result); |
416 | break; |
417 | case FROM_BATCH: |
418 | result = proxyer_handle_batch_response(m, &pr); |
419 | batch_sub_proxy_update_stats(pr.ns, result); |
420 | // Note - no worries about msgp, proxy divert copied it. |
421 | break; |
422 | default: |
423 | cf_crash(AS_PROXY, "unexpected transaction origin %u" , pr.origin); |
424 | break; |
425 | } |
426 | |
427 | pr.from.any = NULL; // pattern, not needed |
428 | |
429 | as_fabric_msg_put(pr.fab_msg); |
430 | |
431 | // Note that this includes both origins. |
432 | if (pr.ns->proxy_hist_enabled) { |
433 | histogram_insert_data_point(pr.ns->proxy_hist, pr.start_time); |
434 | } |
435 | } |
436 | |
437 | |
438 | int |
439 | proxyer_handle_client_response(msg* m, proxy_request* pr) |
440 | { |
441 | uint8_t* proto; |
442 | size_t proto_sz; |
443 | |
444 | if (msg_get_buf(m, PROXY_FIELD_AS_PROTO, &proto, &proto_sz, |
445 | MSG_GET_DIRECT) != 0) { |
446 | cf_warning(AS_PROXY, "msg get for proto failed" ); |
447 | return AS_ERR_UNKNOWN; |
448 | } |
449 | |
450 | as_file_handle* fd_h = pr->from.proto_fd_h; |
451 | |
452 | if (cf_socket_send_all(&fd_h->sock, proto, proto_sz, MSG_NOSIGNAL, |
453 | CF_SOCKET_TIMEOUT) < 0) { |
454 | // Common when a client aborts. |
455 | as_end_of_transaction_force_close(fd_h); |
456 | return AS_ERR_UNKNOWN; |
457 | } |
458 | |
459 | as_end_of_transaction_ok(fd_h); |
460 | return AS_OK; |
461 | } |
462 | |
463 | |
464 | int |
465 | proxyer_handle_batch_response(msg* m, proxy_request* pr) |
466 | { |
467 | cl_msg* msgp; |
468 | size_t msgp_sz; |
469 | |
470 | if (msg_get_buf(m, PROXY_FIELD_AS_PROTO, (uint8_t**)&msgp, &msgp_sz, |
471 | MSG_GET_DIRECT) != 0) { |
472 | cf_warning(AS_PROXY, "msg get for proto failed" ); |
473 | return AS_ERR_UNKNOWN; |
474 | } |
475 | |
476 | cf_digest* keyd; |
477 | |
478 | if (msg_get_buf(pr->fab_msg, PROXY_FIELD_DIGEST, (uint8_t**)&keyd, NULL, |
479 | MSG_GET_DIRECT) != 0) { |
480 | cf_crash(AS_PROXY, "original msg get for digest failed" ); |
481 | } |
482 | |
483 | as_batch_add_proxy_result(pr->from.batch_shared, pr->batch_index, keyd, |
484 | msgp, msgp_sz); |
485 | |
486 | return AS_OK; |
487 | } |
488 | |
489 | |
490 | void |
491 | proxyer_handle_return_to_sender(msg* m, uint32_t tid) |
492 | { |
493 | proxy_request* pr; |
494 | cf_mutex* lock; |
495 | |
496 | if (cf_shash_get_vlock(g_proxy_hash, &tid, (void**)&pr, &lock) != |
497 | CF_SHASH_OK) { |
498 | // Some other response (or timeout) has already finished this pr. |
499 | return; |
500 | } |
501 | |
502 | cf_node redirect_node; |
503 | |
504 | if (msg_get_uint64(m, PROXY_FIELD_REDIRECT, &redirect_node) == 0 |
505 | && redirect_node != g_config.self_node |
506 | && redirect_node != (cf_node)0) { |
507 | // If this node was a "random" node, i.e. neither acting nor eventual |
508 | // master, it diverts to the eventual master (the best it can do.) The |
509 | // eventual master must inform this node about the acting master. |
510 | |
511 | msg_incr_ref(pr->fab_msg); |
512 | |
513 | if (as_fabric_send(redirect_node, pr->fab_msg, AS_FABRIC_CHANNEL_RW) != |
514 | AS_FABRIC_SUCCESS) { |
515 | as_fabric_msg_put(pr->fab_msg); |
516 | } |
517 | |
518 | cf_mutex_unlock(lock); |
519 | return; |
520 | } |
521 | |
522 | cf_digest* keyd; |
523 | |
524 | if (msg_get_buf(pr->fab_msg, PROXY_FIELD_DIGEST, (uint8_t**)&keyd, NULL, |
525 | MSG_GET_DIRECT) != 0) { |
526 | cf_crash(AS_PROXY, "original msg get for digest failed" ); |
527 | } |
528 | |
529 | cl_msg* msgp; |
530 | |
531 | // TODO - inefficient! Should be a way to 'take' a buffer from msg. |
532 | if (msg_get_buf(pr->fab_msg, PROXY_FIELD_AS_PROTO, (uint8_t**)&msgp, NULL, |
533 | MSG_GET_COPY_MALLOC) != 0) { |
534 | cf_crash(AS_PROXY, "original msg get for proto failed" ); |
535 | } |
536 | |
537 | // Put the as_msg on the normal queue for processing. |
538 | as_transaction tr; |
539 | as_transaction_init_head(&tr, keyd, msgp); |
540 | // msgp might not have digest - batch sub-transactions, old clients. |
541 | // For old clients, will compute it again from msgp key and set. |
542 | |
543 | tr.msg_fields = pr->msg_fields; |
544 | tr.origin = pr->origin; |
545 | tr.from_flags = pr->from_flags; |
546 | tr.from.any = pr->from.any; |
547 | tr.from_data.batch_index = pr->batch_index; |
548 | tr.start_time = pr->start_time; |
549 | |
550 | as_service_enqueue_internal(&tr); |
551 | |
552 | as_fabric_msg_put(pr->fab_msg); |
553 | |
554 | cf_shash_delete_lockfree(g_proxy_hash, &tid); |
555 | cf_mutex_unlock(lock); |
556 | } |
557 | |
558 | |
559 | //========================================================== |
560 | // Local helpers - proxyee. |
561 | // |
562 | |
563 | void |
564 | proxyee_handle_request(cf_node src, msg* m, uint32_t tid) |
565 | { |
566 | cf_digest* keyd; |
567 | |
568 | if (msg_get_buf(m, PROXY_FIELD_DIGEST, (uint8_t**)&keyd, NULL, |
569 | MSG_GET_DIRECT) != 0) { |
570 | cf_warning(AS_PROXY, "msg get for digest failed" ); |
571 | error_response(src, tid, AS_ERR_UNKNOWN); |
572 | return; |
573 | } |
574 | |
575 | cl_msg* msgp; |
576 | size_t msgp_sz; |
577 | |
578 | if (msg_get_buf(m, PROXY_FIELD_AS_PROTO, (uint8_t**)&msgp, &msgp_sz, |
579 | MSG_GET_COPY_MALLOC) != 0) { |
580 | cf_warning(AS_PROXY, "msg get for proto failed" ); |
581 | error_response(src, tid, AS_ERR_UNKNOWN); |
582 | return; |
583 | } |
584 | |
585 | // Sanity check as_proto fields. |
586 | as_proto* proto = &msgp->proto; |
587 | |
588 | if (! as_proto_wrapped_is_valid(proto, msgp_sz)) { |
589 | cf_warning(AS_PROXY, "bad proto: version %u, type %u, sz %lu [%lu]" , |
590 | proto->version, proto->type, (uint64_t)proto->sz, msgp_sz); |
591 | error_response(src, tid, AS_ERR_UNKNOWN); |
592 | return; |
593 | } |
594 | |
595 | // Put the as_msg on the normal queue for processing. |
596 | as_transaction tr; |
597 | as_transaction_init_head(&tr, keyd, msgp); |
598 | // msgp might not have digest - batch sub-transactions, old clients. |
599 | // For old clients, will compute it again from msgp key and set. |
600 | |
601 | tr.start_time = cf_getns(); |
602 | |
603 | tr.origin = FROM_PROXY; |
604 | tr.from.proxy_node = src; |
605 | tr.from_data.proxy_tid = tid; |
606 | |
607 | // Proxyer has already done byte swapping in as_msg. |
608 | if (! as_transaction_prepare(&tr, false)) { |
609 | cf_warning(AS_PROXY, "bad proxy msg" ); |
610 | error_response(src, tid, AS_ERR_UNKNOWN); |
611 | return; |
612 | } |
613 | |
614 | // For batch sub-transactions, make sure we flag them so they're not |
615 | // mistaken for multi-record transactions (which never proxy). |
616 | if (as_transaction_has_no_key_or_digest(&tr)) { |
617 | tr.from_flags |= FROM_FLAG_BATCH_SUB; |
618 | } |
619 | |
620 | as_service_enqueue_internal(&tr); |
621 | } |
622 | |
623 | |
624 | //========================================================== |
625 | // Local helpers - timeout. |
626 | // |
627 | |
628 | void* |
629 | run_proxy_timeout(void* arg) |
630 | { |
631 | while (true) { |
632 | usleep(75 * 1000); |
633 | |
634 | now_times now; |
635 | |
636 | now.now_ns = cf_getns(); |
637 | now.now_ms = now.now_ns / 1000000; |
638 | |
639 | cf_shash_reduce(g_proxy_hash, proxy_timeout_reduce_fn, &now); |
640 | } |
641 | |
642 | return NULL; |
643 | } |
644 | |
645 | |
646 | int |
647 | proxy_timeout_reduce_fn(const void* key, void* data, void* udata) |
648 | { |
649 | proxy_request* pr = data; |
650 | now_times* now = (now_times*)udata; |
651 | |
652 | if (now->now_ns < pr->end_time) { |
653 | return CF_SHASH_OK; |
654 | } |
655 | |
656 | // Handle timeouts. |
657 | |
658 | cf_assert(pr->from.any, AS_PROXY, "origin %u has null 'from'" , pr->origin); |
659 | |
660 | switch (pr->origin) { |
661 | case FROM_CLIENT: |
662 | // TODO - when it becomes important enough, find a way to echo trid. |
663 | as_msg_send_reply(pr->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL, |
664 | 0, pr->ns, 0); |
665 | client_proxy_update_stats(pr->ns, AS_ERR_TIMEOUT); |
666 | break; |
667 | case FROM_BATCH: |
668 | as_batch_add_error(pr->from.batch_shared, pr->batch_index, |
669 | AS_ERR_TIMEOUT); |
670 | // Note - no worries about msgp, proxy divert copied it. |
671 | batch_sub_proxy_update_stats(pr->ns, AS_ERR_TIMEOUT); |
672 | break; |
673 | default: |
674 | cf_crash(AS_PROXY, "unexpected transaction origin %u" , pr->origin); |
675 | break; |
676 | } |
677 | |
678 | pr->from.any = NULL; // pattern, not needed |
679 | as_fabric_msg_put(pr->fab_msg); |
680 | |
681 | return CF_SHASH_REDUCE_DELETE; |
682 | } |
683 | |
684 | |
685 | //========================================================== |
686 | // Local helpers - handle PROXY fabric messages. |
687 | // |
688 | |
689 | int |
690 | proxy_msg_cb(cf_node src, msg* m, void* udata) |
691 | { |
692 | uint32_t op; |
693 | |
694 | if (msg_get_uint32(m, PROXY_FIELD_OP, &op) != 0) { |
695 | cf_warning(AS_PROXY, "msg get for op failed" ); |
696 | as_fabric_msg_put(m); |
697 | return 0; |
698 | } |
699 | |
700 | uint32_t tid; |
701 | |
702 | if (msg_get_uint32(m, PROXY_FIELD_TID, &tid) != 0) { |
703 | cf_warning(AS_PROXY, "msg get for tid failed" ); |
704 | as_fabric_msg_put(m); |
705 | return 0; |
706 | } |
707 | |
708 | switch (op) { |
709 | case PROXY_OP_REQUEST: |
710 | proxyee_handle_request(src, m, tid); |
711 | break; |
712 | case PROXY_OP_RESPONSE: |
713 | proxyer_handle_response(m, tid); |
714 | break; |
715 | case PROXY_OP_RETURN_TO_SENDER: |
716 | proxyer_handle_return_to_sender(m, tid); |
717 | break; |
718 | default: |
719 | cf_warning(AS_PROXY, "received unexpected message op %u" , op); |
720 | break; |
721 | } |
722 | |
723 | as_fabric_msg_put(m); |
724 | return 0; |
725 | } |
726 | |