1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31
32#include <stddef.h>
33#include "poller.hpp"
34#include "proxy.hpp"
35#include "likely.hpp"
36#include "msg.hpp"
37
38#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS \
39 && !defined ZMQ_HAVE_AIX
40#include <poll.h>
41#endif
42
43// These headers end up pulling in zmq.h somewhere in their include
44// dependency chain
45#include "socket_base.hpp"
46#include "err.hpp"
47
48#ifdef ZMQ_HAVE_POLLER
49
50#include "socket_poller.hpp"
51
52// Macros for repetitive code.
53
54// PROXY_CLEANUP() must not be used before these variables are initialized.
55#define PROXY_CLEANUP() \
56 do { \
57 delete poller_all; \
58 delete poller_in; \
59 delete poller_control; \
60 delete poller_receive_blocked; \
61 delete poller_send_blocked; \
62 delete poller_both_blocked; \
63 delete poller_frontend_only; \
64 delete poller_backend_only; \
65 } while (false)
66
67
68#define CHECK_RC_EXIT_ON_FAILURE() \
69 do { \
70 if (rc < 0) { \
71 PROXY_CLEANUP (); \
72 return close_and_return (&msg, -1); \
73 } \
74 } while (false)
75
76#endif // ZMQ_HAVE_POLLER
77
78
79// Control socket messages
80
81typedef struct
82{
83 uint64_t msg_in;
84 uint64_t bytes_in;
85 uint64_t msg_out;
86 uint64_t bytes_out;
87} zmq_socket_stats_t;
88
89
90// Utility functions
91
92int capture (class zmq::socket_base_t *capture_,
93 zmq::msg_t *msg_,
94 int more_ = 0)
95{
96 // Copy message to capture socket if any
97 if (capture_) {
98 zmq::msg_t ctrl;
99 int rc = ctrl.init ();
100 if (unlikely (rc < 0))
101 return -1;
102 rc = ctrl.copy (*msg_);
103 if (unlikely (rc < 0))
104 return -1;
105 rc = capture_->send (&ctrl, more_ ? ZMQ_SNDMORE : 0);
106 if (unlikely (rc < 0))
107 return -1;
108 }
109 return 0;
110}
111
112int forward (class zmq::socket_base_t *from_,
113 zmq_socket_stats_t *from_stats_,
114 class zmq::socket_base_t *to_,
115 zmq_socket_stats_t *to_stats_,
116 class zmq::socket_base_t *capture_,
117 zmq::msg_t *msg_)
118{
119 // Forward a burst of messages
120 for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) {
121 int more;
122 size_t moresz;
123 size_t complete_msg_size = 0;
124
125 // Forward all the parts of one message
126 while (true) {
127 int rc = from_->recv (msg_, ZMQ_DONTWAIT);
128 if (rc < 0) {
129 if (likely (errno == EAGAIN && i > 0))
130 return 0; // End of burst
131
132 return -1;
133 }
134
135 complete_msg_size += msg_->size ();
136
137 moresz = sizeof more;
138 rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
139 if (unlikely (rc < 0))
140 return -1;
141
142 // Copy message to capture socket if any
143 rc = capture (capture_, msg_, more);
144 if (unlikely (rc < 0))
145 return -1;
146
147 rc = to_->send (msg_, more ? ZMQ_SNDMORE : 0);
148 if (unlikely (rc < 0))
149 return -1;
150
151 if (more == 0)
152 break;
153 }
154
155 // A multipart message counts as 1 packet:
156 from_stats_->msg_in++;
157 from_stats_->bytes_in += complete_msg_size;
158 to_stats_->msg_out++;
159 to_stats_->bytes_out += complete_msg_size;
160 }
161
162 return 0;
163}
164
165static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
166 uint64_t stat_,
167 bool first_,
168 bool more_)
169{
170 int rc;
171 zmq::msg_t msg;
172
173 // VSM of 8 bytes can't fail to init
174 msg.init_size (sizeof (uint64_t));
175 memcpy (msg.data (), &stat_, sizeof (uint64_t));
176
177 // if the first message is handed to the pipe successfully then the HWM
178 // is not full, which means failures are due to interrupts (on Windows pipes
179 // are TCP sockets), so keep retrying
180 do {
181 rc = control_->send (&msg, more_ ? ZMQ_SNDMORE : 0);
182 } while (!first_ && rc != 0 && errno == EAGAIN);
183
184 return rc;
185}
186
187int reply_stats (class zmq::socket_base_t *control_,
188 zmq_socket_stats_t *frontend_stats_,
189 zmq_socket_stats_t *backend_stats_)
190{
191 // first part: frontend stats - the first send might fail due to HWM
192 if (loop_and_send_multipart_stat (control_, frontend_stats_->msg_in, true,
193 true)
194 != 0)
195 return -1;
196
197 loop_and_send_multipart_stat (control_, frontend_stats_->bytes_in, false,
198 true);
199 loop_and_send_multipart_stat (control_, frontend_stats_->msg_out, false,
200 true);
201 loop_and_send_multipart_stat (control_, frontend_stats_->bytes_out, false,
202 true);
203
204 // second part: backend stats
205 loop_and_send_multipart_stat (control_, backend_stats_->msg_in, false,
206 true);
207 loop_and_send_multipart_stat (control_, backend_stats_->bytes_in, false,
208 true);
209 loop_and_send_multipart_stat (control_, backend_stats_->msg_out, false,
210 true);
211 loop_and_send_multipart_stat (control_, backend_stats_->bytes_out, false,
212 false);
213
214 return 0;
215}
216
217
218#ifdef ZMQ_HAVE_POLLER
219
220int zmq::proxy (class socket_base_t *frontend_,
221 class socket_base_t *backend_,
222 class socket_base_t *capture_,
223 class socket_base_t *control_)
224{
225 msg_t msg;
226 int rc = msg.init ();
227 if (rc != 0)
228 return -1;
229
230 // The algorithm below assumes ratio of requests and replies processed
231 // under full load to be 1:1.
232
233 int more;
234 size_t moresz = sizeof (more);
235
236 // Proxy can be in these three states
237 enum
238 {
239 active,
240 paused,
241 terminated
242 } state = active;
243
244 bool frontend_equal_to_backend;
245 bool frontend_in = false;
246 bool frontend_out = false;
247 bool backend_in = false;
248 bool backend_out = false;
249 bool control_in = false;
250 zmq::socket_poller_t::event_t events[3];
251 zmq_socket_stats_t frontend_stats;
252 zmq_socket_stats_t backend_stats;
253 memset (&frontend_stats, 0, sizeof (frontend_stats));
254 memset (&backend_stats, 0, sizeof (backend_stats));
255
256 // Don't allocate these pollers from stack because they will take more than 900 kB of stack!
257 // On Windows this blows up default stack of 1 MB and aborts the program.
258 // I wanted to use std::shared_ptr here as the best solution but that requires C++11...
259 zmq::socket_poller_t *poller_all =
260 new (std::nothrow) zmq::socket_poller_t; // Poll for everything.
261 zmq::socket_poller_t *poller_in = new (std::nothrow) zmq::
262 socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop.
263 zmq::socket_poller_t *poller_control = new (std::nothrow) zmq::
264 socket_poller_t; // Poll only for 'ZMQ_POLLIN' on 'control_', when proxy is paused.
265 zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow)
266 zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'.
267
268 // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored.
269 // In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'.
270 // We also don't need 'poller_both_blocked', 'poller_backend_only' nor 'poller_frontend_only' no need to initialize it.
271 // We save some RAM and time for initialization.
272 zmq::socket_poller_t *poller_send_blocked =
273 NULL; // All except 'ZMQ_POLLIN' on 'backend_'.
274 zmq::socket_poller_t *poller_both_blocked =
275 NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
276 zmq::socket_poller_t *poller_frontend_only =
277 NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
278 zmq::socket_poller_t *poller_backend_only =
279 NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
280
281 if (frontend_ != backend_) {
282 poller_send_blocked = new (std::nothrow)
283 zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'.
284 poller_both_blocked = new (std::nothrow) zmq::
285 socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
286 poller_frontend_only = new (std::nothrow) zmq::
287 socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
288 poller_backend_only = new (std::nothrow) zmq::
289 socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
290 frontend_equal_to_backend = false;
291 } else
292 frontend_equal_to_backend = true;
293
294 if (poller_all == NULL || poller_in == NULL || poller_control == NULL
295 || poller_receive_blocked == NULL
296 || ((poller_send_blocked == NULL || poller_both_blocked == NULL)
297 && !frontend_equal_to_backend)) {
298 PROXY_CLEANUP ();
299 return close_and_return (&msg, -1);
300 }
301
302 zmq::socket_poller_t *poller_wait =
303 poller_in; // Poller for blocking wait, initially all 'ZMQ_POLLIN'.
304
305 // Register 'frontend_' and 'backend_' with pollers.
306 rc = poller_all->add (frontend_, NULL,
307 ZMQ_POLLIN | ZMQ_POLLOUT); // Everything.
308 CHECK_RC_EXIT_ON_FAILURE ();
309 rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's.
310 CHECK_RC_EXIT_ON_FAILURE ();
311
312 if (frontend_equal_to_backend) {
313 // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same,
314 // so we don't need 'poller_send_blocked'. We need only 'poller_receive_blocked'.
315 // We also don't need 'poller_both_blocked', no need to initialize it.
316 rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT);
317 CHECK_RC_EXIT_ON_FAILURE ();
318 } else {
319 rc = poller_all->add (backend_, NULL,
320 ZMQ_POLLIN | ZMQ_POLLOUT); // Everything.
321 CHECK_RC_EXIT_ON_FAILURE ();
322 rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's.
323 CHECK_RC_EXIT_ON_FAILURE ();
324 rc = poller_both_blocked->add (
325 frontend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'.
326 CHECK_RC_EXIT_ON_FAILURE ();
327 rc = poller_both_blocked->add (
328 backend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'.
329 CHECK_RC_EXIT_ON_FAILURE ();
330 rc = poller_send_blocked->add (
331 backend_, NULL,
332 ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'.
333 CHECK_RC_EXIT_ON_FAILURE ();
334 rc = poller_send_blocked->add (
335 frontend_, NULL,
336 ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'.
337 CHECK_RC_EXIT_ON_FAILURE ();
338 rc = poller_receive_blocked->add (
339 frontend_, NULL,
340 ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
341 CHECK_RC_EXIT_ON_FAILURE ();
342 rc = poller_receive_blocked->add (
343 backend_, NULL,
344 ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
345 CHECK_RC_EXIT_ON_FAILURE ();
346 rc =
347 poller_frontend_only->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
348 CHECK_RC_EXIT_ON_FAILURE ();
349 rc =
350 poller_backend_only->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
351 CHECK_RC_EXIT_ON_FAILURE ();
352 }
353
354 // Register 'control_' with pollers.
355 if (control_ != NULL) {
356 rc = poller_all->add (control_, NULL, ZMQ_POLLIN);
357 CHECK_RC_EXIT_ON_FAILURE ();
358 rc = poller_in->add (control_, NULL, ZMQ_POLLIN);
359 CHECK_RC_EXIT_ON_FAILURE ();
360 rc = poller_control->add (
361 control_, NULL,
362 ZMQ_POLLIN); // When proxy is paused we wait only for ZMQ_POLLIN on 'control_' socket.
363 CHECK_RC_EXIT_ON_FAILURE ();
364 rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN);
365 CHECK_RC_EXIT_ON_FAILURE ();
366 if (!frontend_equal_to_backend) {
367 rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN);
368 CHECK_RC_EXIT_ON_FAILURE ();
369 rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN);
370 CHECK_RC_EXIT_ON_FAILURE ();
371 rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN);
372 CHECK_RC_EXIT_ON_FAILURE ();
373 rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN);
374 CHECK_RC_EXIT_ON_FAILURE ();
375 }
376 }
377
378 bool request_processed, reply_processed;
379
380 while (state != terminated) {
381 // Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'.
382 // If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
383 // 'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'.
384 rc = poller_wait->wait (events, 3, -1);
385 if (rc < 0 && errno == EAGAIN)
386 rc = 0;
387 CHECK_RC_EXIT_ON_FAILURE ();
388
389 // Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking.
390 rc = poller_all->wait (events, 3, 0);
391 if (rc < 0 && errno == EAGAIN)
392 rc = 0;
393 CHECK_RC_EXIT_ON_FAILURE ();
394
395 // Process events.
396 for (int i = 0; i < rc; i++) {
397 if (events[i].socket == frontend_) {
398 frontend_in = (events[i].events & ZMQ_POLLIN) != 0;
399 frontend_out = (events[i].events & ZMQ_POLLOUT) != 0;
400 } else
401 // This 'if' needs to be after check for 'frontend_' in order never
402 // to be reached in case frontend_==backend_, so we ensure backend_in=false in that case.
403 if (events[i].socket == backend_) {
404 backend_in = (events[i].events & ZMQ_POLLIN) != 0;
405 backend_out = (events[i].events & ZMQ_POLLOUT) != 0;
406 } else if (events[i].socket == control_)
407 control_in = (events[i].events & ZMQ_POLLIN) != 0;
408 }
409
410
411 // Process a control command if any.
412 if (control_in) {
413 rc = control_->recv (&msg, 0);
414 CHECK_RC_EXIT_ON_FAILURE ();
415 rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
416 if (unlikely (rc < 0) || more) {
417 PROXY_CLEANUP ();
418 return close_and_return (&msg, -1);
419 }
420
421 // Copy message to capture socket if any.
422 rc = capture (capture_, &msg);
423 CHECK_RC_EXIT_ON_FAILURE ();
424
425 if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) {
426 state = paused;
427 poller_wait = poller_control;
428 } else if (msg.size () == 6
429 && memcmp (msg.data (), "RESUME", 6) == 0) {
430 state = active;
431 poller_wait = poller_in;
432 } else {
433 if (msg.size () == 9
434 && memcmp (msg.data (), "TERMINATE", 9) == 0)
435 state = terminated;
436 else {
437 if (msg.size () == 10
438 && memcmp (msg.data (), "STATISTICS", 10) == 0) {
439 rc = reply_stats (control_, &frontend_stats,
440 &backend_stats);
441 CHECK_RC_EXIT_ON_FAILURE ();
442 } else {
443 // This is an API error, we assert
444 puts ("E: invalid command sent to proxy");
445 zmq_assert (false);
446 }
447 }
448 }
449 control_in = false;
450 }
451
452 if (state == active) {
453 // Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
454 // In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
455 if (frontend_in && (backend_out || frontend_equal_to_backend)) {
456 rc = forward (frontend_, &frontend_stats, backend_,
457 &backend_stats, capture_, &msg);
458 CHECK_RC_EXIT_ON_FAILURE ();
459 request_processed = true;
460 frontend_in = backend_out = false;
461 } else
462 request_processed = false;
463
464 // Process a reply, 'ZMQ_POLLIN' on 'backend_' and 'ZMQ_POLLOUT' on 'frontend_'.
465 // If 'frontend_' and 'backend_' are the same this is not needed because previous processing
466 // covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
467 // design in 'for' event processing loop.
468 if (backend_in && frontend_out) {
469 rc = forward (backend_, &backend_stats, frontend_,
470 &frontend_stats, capture_, &msg);
471 CHECK_RC_EXIT_ON_FAILURE ();
472 reply_processed = true;
473 backend_in = frontend_out = false;
474 } else
475 reply_processed = false;
476
477 if (request_processed || reply_processed) {
478 // If request/reply is processed that means we had at least one 'ZMQ_POLLOUT' event.
479 // Enable corresponding 'ZMQ_POLLIN' for blocking wait if any was disabled.
480 if (poller_wait != poller_in) {
481 if (request_processed) { // 'frontend_' -> 'backend_'
482 if (poller_wait == poller_both_blocked)
483 poller_wait = poller_send_blocked;
484 else if (poller_wait == poller_receive_blocked
485 || poller_wait == poller_frontend_only)
486 poller_wait = poller_in;
487 }
488 if (reply_processed) { // 'backend_' -> 'frontend_'
489 if (poller_wait == poller_both_blocked)
490 poller_wait = poller_receive_blocked;
491 else if (poller_wait == poller_send_blocked
492 || poller_wait == poller_backend_only)
493 poller_wait = poller_in;
494 }
495 }
496 } else {
497 // No requests have been processed, there were no 'ZMQ_POLLIN' with corresponding 'ZMQ_POLLOUT' events.
498 // That means that out queue(s) is/are full or one out queue is full and second one has no messages to process.
499 // Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT',
500 // or wait only on both 'backend_''s or 'frontend_''s 'ZMQ_POLLIN' and 'ZMQ_POLLOUT'.
501 if (frontend_in) {
502 if (frontend_out)
503 // If frontend_in and frontend_out are true, obviously backend_in and backend_out are both false.
504 // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'backend_'.
505 // We'll never get here in case of frontend_==backend_ because then frontend_out will always be false.
506 poller_wait = poller_backend_only;
507 else {
508 if (poller_wait == poller_send_blocked)
509 poller_wait = poller_both_blocked;
510 else if (poller_wait == poller_in)
511 poller_wait = poller_receive_blocked;
512 }
513 }
514 if (backend_in) {
515 // Will never be reached if frontend_==backend_, 'backend_in' will
516 // always be false due to design in 'for' event processing loop.
517 if (backend_out)
518 // If backend_in and backend_out are true, obviously frontend_in and frontend_out are both false.
519 // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'frontend_'.
520 poller_wait = poller_frontend_only;
521 else {
522 if (poller_wait == poller_receive_blocked)
523 poller_wait = poller_both_blocked;
524 else if (poller_wait == poller_in)
525 poller_wait = poller_send_blocked;
526 }
527 }
528 }
529 }
530 }
531 PROXY_CLEANUP ();
532 return close_and_return (&msg, 0);
533}
534
535#else // ZMQ_HAVE_POLLER
536
537int zmq::proxy (class socket_base_t *frontend_,
538 class socket_base_t *backend_,
539 class socket_base_t *capture_,
540 class socket_base_t *control_)
541{
542 msg_t msg;
543 int rc = msg.init ();
544 if (rc != 0)
545 return -1;
546
547 // The algorithm below assumes ratio of requests and replies processed
548 // under full load to be 1:1.
549
550 int more;
551 size_t moresz;
552 zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0},
553 {backend_, 0, ZMQ_POLLIN, 0},
554 {control_, 0, ZMQ_POLLIN, 0}};
555 int qt_poll_items = (control_ ? 3 : 2);
556 zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0},
557 {backend_, 0, ZMQ_POLLOUT, 0}};
558
559 zmq_socket_stats_t frontend_stats;
560 memset (&frontend_stats, 0, sizeof (frontend_stats));
561 zmq_socket_stats_t backend_stats;
562 memset (&backend_stats, 0, sizeof (backend_stats));
563
564 // Proxy can be in these three states
565 enum
566 {
567 active,
568 paused,
569 terminated
570 } state = active;
571
572 while (state != terminated) {
573 // Wait while there are either requests or replies to process.
574 rc = zmq_poll (&items[0], qt_poll_items, -1);
575 if (unlikely (rc < 0))
576 return close_and_return (&msg, -1);
577
578 // Get the pollout separately because when combining this with pollin it maxes the CPU
579 // because pollout shall most of the time return directly.
580 // POLLOUT is only checked when frontend and backend sockets are not the same.
581 if (frontend_ != backend_) {
582 rc = zmq_poll (&itemsout[0], 2, 0);
583 if (unlikely (rc < 0)) {
584 return close_and_return (&msg, -1);
585 }
586 }
587
588 // Process a control command if any
589 if (control_ && items[2].revents & ZMQ_POLLIN) {
590 rc = control_->recv (&msg, 0);
591 if (unlikely (rc < 0))
592 return close_and_return (&msg, -1);
593
594 moresz = sizeof more;
595 rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
596 if (unlikely (rc < 0) || more)
597 return close_and_return (&msg, -1);
598
599 // Copy message to capture socket if any
600 rc = capture (capture_, &msg);
601 if (unlikely (rc < 0))
602 return close_and_return (&msg, -1);
603
604 if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
605 state = paused;
606 else if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
607 state = active;
608 else if (msg.size () == 9
609 && memcmp (msg.data (), "TERMINATE", 9) == 0)
610 state = terminated;
611 else {
612 if (msg.size () == 10
613 && memcmp (msg.data (), "STATISTICS", 10) == 0) {
614 rc =
615 reply_stats (control_, &frontend_stats, &backend_stats);
616 if (unlikely (rc < 0))
617 return close_and_return (&msg, -1);
618 } else {
619 // This is an API error, we assert
620 puts ("E: invalid command sent to proxy");
621 zmq_assert (false);
622 }
623 }
624 }
625 // Process a request
626 if (state == active && items[0].revents & ZMQ_POLLIN
627 && (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) {
628 rc = forward (frontend_, &frontend_stats, backend_, &backend_stats,
629 capture_, &msg);
630 if (unlikely (rc < 0))
631 return close_and_return (&msg, -1);
632 }
633 // Process a reply
634 if (state == active && frontend_ != backend_
635 && items[1].revents & ZMQ_POLLIN
636 && itemsout[0].revents & ZMQ_POLLOUT) {
637 rc = forward (backend_, &backend_stats, frontend_, &frontend_stats,
638 capture_, &msg);
639 if (unlikely (rc < 0))
640 return close_and_return (&msg, -1);
641 }
642 }
643
644 return close_and_return (&msg, 0);
645}
646
647#endif // ZMQ_HAVE_POLLER
648