1/*
2 * proxy.c
3 *
4 * Copyright (C) 2016 Aerospike, Inc.
5 *
6 * Portions may be licensed to Aerospike, Inc. under one or more contributor
7 * license agreements.
8 *
9 * This program is free software: you can redistribute it and/or modify it under
10 * the terms of the GNU Affero General Public License as published by the Free
11 * Software Foundation, either version 3 of the License, or (at your option) any
12 * later version.
13 *
14 * This program is distributed in the hope that it will be useful, but WITHOUT
15 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
16 * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
17 * details.
18 *
19 * You should have received a copy of the GNU Affero General Public License
20 * along with this program. If not, see http://www.gnu.org/licenses/
21 */
22
23//==========================================================
24// Includes.
25//
26
27#include "transaction/proxy.h"
28
29#include <errno.h>
30#include <stdbool.h>
31#include <stddef.h>
32#include <stdint.h>
33#include <unistd.h>
34
35#include "citrusleaf/alloc.h"
36#include "citrusleaf/cf_atomic.h"
37#include "citrusleaf/cf_clock.h"
38#include "citrusleaf/cf_digest.h"
39
40#include "cf_mutex.h"
41#include "cf_thread.h"
42#include "dynbuf.h"
43#include "fault.h"
44#include "msg.h"
45#include "node.h"
46#include "shash.h"
47#include "socket.h"
48
49#include "base/batch.h"
50#include "base/datamodel.h"
51#include "base/health.h"
52#include "base/proto.h"
53#include "base/service.h"
54#include "base/stats.h"
55#include "base/transaction.h"
56#include "fabric/exchange.h"
57#include "fabric/fabric.h"
58#include "fabric/partition.h"
59#include "transaction/rw_request.h"
60#include "transaction/rw_request_hash.h"
61#include "transaction/rw_utils.h"
62#include "transaction/udf.h"
63
64
65//==========================================================
66// Typedefs & constants.
67//
68
69typedef enum {
70 // These values go on the wire, so mind backward compatibility if changing.
71 PROXY_FIELD_OP,
72 PROXY_FIELD_TID,
73 PROXY_FIELD_DIGEST,
74 PROXY_FIELD_REDIRECT,
75 PROXY_FIELD_AS_PROTO, // request as_proto - currently contains only as_msg's
76 PROXY_FIELD_UNUSED_5,
77 PROXY_FIELD_UNUSED_6,
78 PROXY_FIELD_UNUSED_7,
79
80 NUM_PROXY_FIELDS
81} proxy_msg_field;
82
83#define PROXY_OP_REQUEST 1
84#define PROXY_OP_RESPONSE 2
85#define PROXY_OP_RETURN_TO_SENDER 3
86
87const msg_template proxy_mt[] = {
88 { PROXY_FIELD_OP, M_FT_UINT32 },
89 { PROXY_FIELD_TID, M_FT_UINT32 },
90 { PROXY_FIELD_DIGEST, M_FT_BUF },
91 { PROXY_FIELD_REDIRECT, M_FT_UINT64 },
92 { PROXY_FIELD_AS_PROTO, M_FT_BUF },
93 { PROXY_FIELD_UNUSED_5, M_FT_UINT64 },
94 { PROXY_FIELD_UNUSED_6, M_FT_UINT32 },
95 { PROXY_FIELD_UNUSED_7, M_FT_UINT32 },
96};
97
98COMPILER_ASSERT(sizeof(proxy_mt) / sizeof(msg_template) == NUM_PROXY_FIELDS);
99
100#define PROXY_MSG_SCRATCH_SIZE 128
101
102typedef struct proxy_request_s {
103 uint32_t msg_fields;
104
105 uint8_t origin;
106 uint8_t from_flags;
107
108 union {
109 void* any;
110 as_file_handle* proto_fd_h;
111 as_batch_shared* batch_shared;
112 // No need yet for other members of this union.
113 } from;
114
115 // No need yet for a 'from_data" union.
116 uint32_t batch_index;
117
118 uint64_t start_time;
119 uint64_t end_time;
120
121 // The original proxy message.
122 msg* fab_msg;
123
124 as_namespace* ns;
125} proxy_request;
126
127
128//==========================================================
129// Globals.
130//
131
132static cf_shash* g_proxy_hash = NULL;
133static cf_atomic32 g_proxy_tid = 0;
134
135
136//==========================================================
137// Forward declarations.
138//
139
140void* run_proxy_timeout(void* arg);
141int proxy_timeout_reduce_fn(const void* key, void* data, void* udata);
142
143int proxy_msg_cb(cf_node src, msg* m, void* udata);
144
145cl_msg* new_msg_w_extra_field(const cl_msg* msgp, const as_msg_field* f);
146void proxyer_handle_response(msg* m, uint32_t tid);
147int proxyer_handle_client_response(msg* m, proxy_request* pr);
148int proxyer_handle_batch_response(msg* m, proxy_request* pr);
149void proxyer_handle_return_to_sender(msg* m, uint32_t tid);
150
151void proxyee_handle_request(cf_node src, msg* m, uint32_t tid);
152
153
154//==========================================================
155// Inlines & macros.
156//
157
158static inline void
159error_response(cf_node src, uint32_t tid, uint32_t error)
160{
161 as_proxy_send_response(src, tid, error, 0, 0, NULL, NULL, 0, NULL, 0);
162}
163
164static inline void
165client_proxy_update_stats(as_namespace* ns, uint8_t result_code)
166{
167 switch (result_code) {
168 case AS_OK:
169 cf_atomic64_incr(&ns->n_client_proxy_complete);
170 break;
171 case AS_ERR_TIMEOUT:
172 cf_atomic64_incr(&ns->n_client_proxy_timeout);
173 break;
174 default:
175 cf_atomic64_incr(&ns->n_client_proxy_error);
176 break;
177 }
178}
179
180static inline void
181batch_sub_proxy_update_stats(as_namespace* ns, uint8_t result_code)
182{
183 switch (result_code) {
184 case AS_OK:
185 cf_atomic64_incr(&ns->n_batch_sub_proxy_complete);
186 break;
187 case AS_ERR_TIMEOUT:
188 cf_atomic64_incr(&ns->n_batch_sub_proxy_timeout);
189 break;
190 default:
191 cf_atomic64_incr(&ns->n_batch_sub_proxy_error);
192 break;
193 }
194}
195
196
197//==========================================================
198// Public API.
199//
200
201void
202as_proxy_init()
203{
204 g_proxy_hash = cf_shash_create(cf_shash_fn_u32, sizeof(uint32_t),
205 sizeof(proxy_request), 4 * 1024, CF_SHASH_MANY_LOCK);
206
207 cf_thread_create_detached(run_proxy_timeout, NULL);
208
209 as_fabric_register_msg_fn(M_TYPE_PROXY, proxy_mt, sizeof(proxy_mt),
210 PROXY_MSG_SCRATCH_SIZE, proxy_msg_cb, NULL);
211}
212
213
214uint32_t
215as_proxy_hash_count()
216{
217 return cf_shash_get_size(g_proxy_hash);
218}
219
220
221// Proxyer - divert a transaction request to another node.
222void
223as_proxy_divert(cf_node dst, as_transaction* tr, as_namespace* ns)
224{
225 // Special log detail.
226 switch (tr->origin) {
227 case FROM_CLIENT:
228 cf_detail_digest(AS_PROXY_DIVERT, &tr->keyd,
229 "{%s} diverting from client %s to node %lx ",
230 ns->name, tr->from.proto_fd_h->client, dst);
231 break;
232 case FROM_BATCH:
233 cf_detail_digest(AS_PROXY_DIVERT, &tr->keyd,
234 "{%s} diverting batch-sub from client %s to node %lx ",
235 ns->name, as_batch_get_fd_h(tr->from.batch_shared)->client,
236 dst);
237 break;
238 default:
239 cf_crash(AS_PROXY, "unexpected transaction origin %u", tr->origin);
240 break;
241 }
242
243 // Get a fabric message and fill it out.
244
245 msg* m = as_fabric_msg_get(M_TYPE_PROXY);
246
247 uint32_t tid = cf_atomic32_incr(&g_proxy_tid);
248
249 msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_REQUEST);
250 msg_set_uint32(m, PROXY_FIELD_TID, tid);
251 msg_set_buf(m, PROXY_FIELD_DIGEST, (void*)&tr->keyd, sizeof(cf_digest),
252 MSG_SET_COPY);
253
254 if (tr->origin == FROM_BATCH) {
255 as_msg_field* f = as_batch_get_predexp_mf(tr->from.batch_shared);
256
257 if (f == NULL) {
258 msg_set_buf(m, PROXY_FIELD_AS_PROTO, (void*)tr->msgp,
259 sizeof(as_proto) + tr->msgp->proto.sz, MSG_SET_COPY);
260 }
261 else {
262 cl_msg* msgp = new_msg_w_extra_field(tr->msgp, f);
263
264 msg_set_buf(m, PROXY_FIELD_AS_PROTO, (void*)msgp,
265 sizeof(as_proto) + msgp->proto.sz, MSG_SET_HANDOFF_MALLOC);
266 }
267 }
268 else {
269 msg_set_buf(m, PROXY_FIELD_AS_PROTO, (void*)tr->msgp,
270 sizeof(as_proto) + tr->msgp->proto.sz, MSG_SET_HANDOFF_MALLOC);
271 }
272
273 // Set up a proxy_request and insert it in the hash.
274
275 proxy_request pr;
276
277 pr.msg_fields = tr->msg_fields;
278
279 pr.origin = tr->origin;
280 pr.from_flags = tr->from_flags;
281 pr.from.any = tr->from.any;
282 pr.batch_index = tr->from_data.batch_index;
283
284 pr.start_time = tr->start_time;
285 pr.end_time = tr->end_time;
286
287 pr.fab_msg = m;
288
289 pr.ns = ns;
290
291 msg_incr_ref(m); // reference for the hash
292
293 cf_shash_put(g_proxy_hash, &tid, &pr);
294
295 tr->msgp = NULL; // pattern, not needed
296 tr->from.any = NULL; // pattern, not needed
297
298 // Send fabric message to remote node.
299
300 if (as_fabric_send(dst, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) {
301 as_fabric_msg_put(m);
302 }
303
304 as_health_add_node_counter(dst, AS_HEALTH_NODE_PROXIES);
305}
306
307
308// Proxyee - transaction reservation failed here, tell proxyer to try again.
309void
310as_proxy_return_to_sender(const as_transaction* tr, as_namespace* ns)
311{
312 msg* m = as_fabric_msg_get(M_TYPE_PROXY);
313 uint32_t pid = as_partition_getid(&tr->keyd);
314 cf_node redirect_node = as_partition_proxyee_redirect(ns, pid);
315
316 msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_RETURN_TO_SENDER);
317 msg_set_uint32(m, PROXY_FIELD_TID, tr->from_data.proxy_tid);
318 msg_set_uint64(m, PROXY_FIELD_REDIRECT,
319 redirect_node == (cf_node)0 ? tr->from.proxy_node : redirect_node);
320
321 if (as_fabric_send(tr->from.proxy_node, m, AS_FABRIC_CHANNEL_RW) !=
322 AS_FABRIC_SUCCESS) {
323 as_fabric_msg_put(m);
324 }
325}
326
327
328// Proxyee - transaction completed here, send response to proxyer.
329void
330as_proxy_send_response(cf_node dst, uint32_t proxy_tid, uint32_t result_code,
331 uint32_t generation, uint32_t void_time, as_msg_op** ops, as_bin** bins,
332 uint16_t bin_count, as_namespace* ns, uint64_t trid)
333{
334 msg* m = as_fabric_msg_get(M_TYPE_PROXY);
335
336 msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_RESPONSE);
337 msg_set_uint32(m, PROXY_FIELD_TID, proxy_tid);
338
339 size_t msg_sz = 0;
340 uint8_t* msgp = (uint8_t*)as_msg_make_response_msg(result_code, generation,
341 void_time, ops, bins, bin_count, ns, 0, &msg_sz, trid);
342
343 msg_set_buf(m, PROXY_FIELD_AS_PROTO, msgp, msg_sz, MSG_SET_HANDOFF_MALLOC);
344
345 if (as_fabric_send(dst, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) {
346 as_fabric_msg_put(m);
347 }
348}
349
350
351// Proxyee - transaction completed here, send response to proxyer.
352void
353as_proxy_send_ops_response(cf_node dst, uint32_t proxy_tid, cf_dyn_buf* db)
354{
355 msg* m = as_fabric_msg_get(M_TYPE_PROXY);
356
357 msg_set_uint32(m, PROXY_FIELD_OP, PROXY_OP_RESPONSE);
358 msg_set_uint32(m, PROXY_FIELD_TID, proxy_tid);
359
360 uint8_t* msgp = db->buf;
361 size_t msg_sz = db->used_sz;
362
363 if (db->is_stack) {
364 msg_set_buf(m, PROXY_FIELD_AS_PROTO, msgp, msg_sz, MSG_SET_COPY);
365 }
366 else {
367 msg_set_buf(m, PROXY_FIELD_AS_PROTO, msgp, msg_sz,
368 MSG_SET_HANDOFF_MALLOC);
369 db->buf = NULL; // the fabric owns the buffer now
370 }
371
372 if (as_fabric_send(dst, m, AS_FABRIC_CHANNEL_RW) != AS_FABRIC_SUCCESS) {
373 as_fabric_msg_put(m);
374 }
375}
376
377
378//==========================================================
379// Local helpers - proxyer.
380//
381
382cl_msg*
383new_msg_w_extra_field(const cl_msg* msgp, const as_msg_field* f)
384{
385 size_t old_sz = sizeof(as_proto) + msgp->proto.sz;
386 size_t extra_sz = sizeof(f->field_sz) + f->field_sz;
387 cl_msg* new_msgp = cf_malloc(old_sz + extra_sz);
388
389 memcpy(new_msgp, msgp, old_sz);
390 memcpy((uint8_t*)new_msgp + old_sz, f, extra_sz);
391
392 new_msgp->proto.sz += extra_sz;
393 new_msgp->msg.n_fields++;
394
395 return new_msgp;
396}
397
398void
399proxyer_handle_response(msg* m, uint32_t tid)
400{
401 proxy_request pr;
402
403 if (cf_shash_get_and_delete(g_proxy_hash, &tid, &pr) != CF_SHASH_OK) {
404 // Some other response (or timeout) has already finished this pr.
405 return;
406 }
407
408 cf_assert(pr.from.any, AS_PROXY, "origin %u has null 'from'", pr.origin);
409
410 int result;
411
412 switch (pr.origin) {
413 case FROM_CLIENT:
414 result = proxyer_handle_client_response(m, &pr);
415 client_proxy_update_stats(pr.ns, result);
416 break;
417 case FROM_BATCH:
418 result = proxyer_handle_batch_response(m, &pr);
419 batch_sub_proxy_update_stats(pr.ns, result);
420 // Note - no worries about msgp, proxy divert copied it.
421 break;
422 default:
423 cf_crash(AS_PROXY, "unexpected transaction origin %u", pr.origin);
424 break;
425 }
426
427 pr.from.any = NULL; // pattern, not needed
428
429 as_fabric_msg_put(pr.fab_msg);
430
431 // Note that this includes both origins.
432 if (pr.ns->proxy_hist_enabled) {
433 histogram_insert_data_point(pr.ns->proxy_hist, pr.start_time);
434 }
435}
436
437
438int
439proxyer_handle_client_response(msg* m, proxy_request* pr)
440{
441 uint8_t* proto;
442 size_t proto_sz;
443
444 if (msg_get_buf(m, PROXY_FIELD_AS_PROTO, &proto, &proto_sz,
445 MSG_GET_DIRECT) != 0) {
446 cf_warning(AS_PROXY, "msg get for proto failed");
447 return AS_ERR_UNKNOWN;
448 }
449
450 as_file_handle* fd_h = pr->from.proto_fd_h;
451
452 if (cf_socket_send_all(&fd_h->sock, proto, proto_sz, MSG_NOSIGNAL,
453 CF_SOCKET_TIMEOUT) < 0) {
454 // Common when a client aborts.
455 as_end_of_transaction_force_close(fd_h);
456 return AS_ERR_UNKNOWN;
457 }
458
459 as_end_of_transaction_ok(fd_h);
460 return AS_OK;
461}
462
463
464int
465proxyer_handle_batch_response(msg* m, proxy_request* pr)
466{
467 cl_msg* msgp;
468 size_t msgp_sz;
469
470 if (msg_get_buf(m, PROXY_FIELD_AS_PROTO, (uint8_t**)&msgp, &msgp_sz,
471 MSG_GET_DIRECT) != 0) {
472 cf_warning(AS_PROXY, "msg get for proto failed");
473 return AS_ERR_UNKNOWN;
474 }
475
476 cf_digest* keyd;
477
478 if (msg_get_buf(pr->fab_msg, PROXY_FIELD_DIGEST, (uint8_t**)&keyd, NULL,
479 MSG_GET_DIRECT) != 0) {
480 cf_crash(AS_PROXY, "original msg get for digest failed");
481 }
482
483 as_batch_add_proxy_result(pr->from.batch_shared, pr->batch_index, keyd,
484 msgp, msgp_sz);
485
486 return AS_OK;
487}
488
489
490void
491proxyer_handle_return_to_sender(msg* m, uint32_t tid)
492{
493 proxy_request* pr;
494 cf_mutex* lock;
495
496 if (cf_shash_get_vlock(g_proxy_hash, &tid, (void**)&pr, &lock) !=
497 CF_SHASH_OK) {
498 // Some other response (or timeout) has already finished this pr.
499 return;
500 }
501
502 cf_node redirect_node;
503
504 if (msg_get_uint64(m, PROXY_FIELD_REDIRECT, &redirect_node) == 0
505 && redirect_node != g_config.self_node
506 && redirect_node != (cf_node)0) {
507 // If this node was a "random" node, i.e. neither acting nor eventual
508 // master, it diverts to the eventual master (the best it can do.) The
509 // eventual master must inform this node about the acting master.
510
511 msg_incr_ref(pr->fab_msg);
512
513 if (as_fabric_send(redirect_node, pr->fab_msg, AS_FABRIC_CHANNEL_RW) !=
514 AS_FABRIC_SUCCESS) {
515 as_fabric_msg_put(pr->fab_msg);
516 }
517
518 cf_mutex_unlock(lock);
519 return;
520 }
521
522 cf_digest* keyd;
523
524 if (msg_get_buf(pr->fab_msg, PROXY_FIELD_DIGEST, (uint8_t**)&keyd, NULL,
525 MSG_GET_DIRECT) != 0) {
526 cf_crash(AS_PROXY, "original msg get for digest failed");
527 }
528
529 cl_msg* msgp;
530
531 // TODO - inefficient! Should be a way to 'take' a buffer from msg.
532 if (msg_get_buf(pr->fab_msg, PROXY_FIELD_AS_PROTO, (uint8_t**)&msgp, NULL,
533 MSG_GET_COPY_MALLOC) != 0) {
534 cf_crash(AS_PROXY, "original msg get for proto failed");
535 }
536
537 // Put the as_msg on the normal queue for processing.
538 as_transaction tr;
539 as_transaction_init_head(&tr, keyd, msgp);
540 // msgp might not have digest - batch sub-transactions, old clients.
541 // For old clients, will compute it again from msgp key and set.
542
543 tr.msg_fields = pr->msg_fields;
544 tr.origin = pr->origin;
545 tr.from_flags = pr->from_flags;
546 tr.from.any = pr->from.any;
547 tr.from_data.batch_index = pr->batch_index;
548 tr.start_time = pr->start_time;
549
550 as_service_enqueue_internal(&tr);
551
552 as_fabric_msg_put(pr->fab_msg);
553
554 cf_shash_delete_lockfree(g_proxy_hash, &tid);
555 cf_mutex_unlock(lock);
556}
557
558
559//==========================================================
560// Local helpers - proxyee.
561//
562
563void
564proxyee_handle_request(cf_node src, msg* m, uint32_t tid)
565{
566 cf_digest* keyd;
567
568 if (msg_get_buf(m, PROXY_FIELD_DIGEST, (uint8_t**)&keyd, NULL,
569 MSG_GET_DIRECT) != 0) {
570 cf_warning(AS_PROXY, "msg get for digest failed");
571 error_response(src, tid, AS_ERR_UNKNOWN);
572 return;
573 }
574
575 cl_msg* msgp;
576 size_t msgp_sz;
577
578 if (msg_get_buf(m, PROXY_FIELD_AS_PROTO, (uint8_t**)&msgp, &msgp_sz,
579 MSG_GET_COPY_MALLOC) != 0) {
580 cf_warning(AS_PROXY, "msg get for proto failed");
581 error_response(src, tid, AS_ERR_UNKNOWN);
582 return;
583 }
584
585 // Sanity check as_proto fields.
586 as_proto* proto = &msgp->proto;
587
588 if (! as_proto_wrapped_is_valid(proto, msgp_sz)) {
589 cf_warning(AS_PROXY, "bad proto: version %u, type %u, sz %lu [%lu]",
590 proto->version, proto->type, (uint64_t)proto->sz, msgp_sz);
591 error_response(src, tid, AS_ERR_UNKNOWN);
592 return;
593 }
594
595 // Put the as_msg on the normal queue for processing.
596 as_transaction tr;
597 as_transaction_init_head(&tr, keyd, msgp);
598 // msgp might not have digest - batch sub-transactions, old clients.
599 // For old clients, will compute it again from msgp key and set.
600
601 tr.start_time = cf_getns();
602
603 tr.origin = FROM_PROXY;
604 tr.from.proxy_node = src;
605 tr.from_data.proxy_tid = tid;
606
607 // Proxyer has already done byte swapping in as_msg.
608 if (! as_transaction_prepare(&tr, false)) {
609 cf_warning(AS_PROXY, "bad proxy msg");
610 error_response(src, tid, AS_ERR_UNKNOWN);
611 return;
612 }
613
614 // For batch sub-transactions, make sure we flag them so they're not
615 // mistaken for multi-record transactions (which never proxy).
616 if (as_transaction_has_no_key_or_digest(&tr)) {
617 tr.from_flags |= FROM_FLAG_BATCH_SUB;
618 }
619
620 as_service_enqueue_internal(&tr);
621}
622
623
624//==========================================================
625// Local helpers - timeout.
626//
627
628void*
629run_proxy_timeout(void* arg)
630{
631 while (true) {
632 usleep(75 * 1000);
633
634 now_times now;
635
636 now.now_ns = cf_getns();
637 now.now_ms = now.now_ns / 1000000;
638
639 cf_shash_reduce(g_proxy_hash, proxy_timeout_reduce_fn, &now);
640 }
641
642 return NULL;
643}
644
645
646int
647proxy_timeout_reduce_fn(const void* key, void* data, void* udata)
648{
649 proxy_request* pr = data;
650 now_times* now = (now_times*)udata;
651
652 if (now->now_ns < pr->end_time) {
653 return CF_SHASH_OK;
654 }
655
656 // Handle timeouts.
657
658 cf_assert(pr->from.any, AS_PROXY, "origin %u has null 'from'", pr->origin);
659
660 switch (pr->origin) {
661 case FROM_CLIENT:
662 // TODO - when it becomes important enough, find a way to echo trid.
663 as_msg_send_reply(pr->from.proto_fd_h, AS_ERR_TIMEOUT, 0, 0, NULL, NULL,
664 0, pr->ns, 0);
665 client_proxy_update_stats(pr->ns, AS_ERR_TIMEOUT);
666 break;
667 case FROM_BATCH:
668 as_batch_add_error(pr->from.batch_shared, pr->batch_index,
669 AS_ERR_TIMEOUT);
670 // Note - no worries about msgp, proxy divert copied it.
671 batch_sub_proxy_update_stats(pr->ns, AS_ERR_TIMEOUT);
672 break;
673 default:
674 cf_crash(AS_PROXY, "unexpected transaction origin %u", pr->origin);
675 break;
676 }
677
678 pr->from.any = NULL; // pattern, not needed
679 as_fabric_msg_put(pr->fab_msg);
680
681 return CF_SHASH_REDUCE_DELETE;
682}
683
684
685//==========================================================
686// Local helpers - handle PROXY fabric messages.
687//
688
689int
690proxy_msg_cb(cf_node src, msg* m, void* udata)
691{
692 uint32_t op;
693
694 if (msg_get_uint32(m, PROXY_FIELD_OP, &op) != 0) {
695 cf_warning(AS_PROXY, "msg get for op failed");
696 as_fabric_msg_put(m);
697 return 0;
698 }
699
700 uint32_t tid;
701
702 if (msg_get_uint32(m, PROXY_FIELD_TID, &tid) != 0) {
703 cf_warning(AS_PROXY, "msg get for tid failed");
704 as_fabric_msg_put(m);
705 return 0;
706 }
707
708 switch (op) {
709 case PROXY_OP_REQUEST:
710 proxyee_handle_request(src, m, tid);
711 break;
712 case PROXY_OP_RESPONSE:
713 proxyer_handle_response(m, tid);
714 break;
715 case PROXY_OP_RETURN_TO_SENDER:
716 proxyer_handle_return_to_sender(m, tid);
717 break;
718 default:
719 cf_warning(AS_PROXY, "received unexpected message op %u", op);
720 break;
721 }
722
723 as_fabric_msg_put(m);
724 return 0;
725}
726