1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | |
32 | #include <stddef.h> |
33 | #include "poller.hpp" |
34 | #include "proxy.hpp" |
35 | #include "likely.hpp" |
36 | #include "msg.hpp" |
37 | |
38 | #if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS \ |
39 | && !defined ZMQ_HAVE_AIX |
40 | #include <poll.h> |
41 | #endif |
42 | |
43 | // These headers end up pulling in zmq.h somewhere in their include |
44 | // dependency chain |
45 | #include "socket_base.hpp" |
46 | #include "err.hpp" |
47 | |
48 | #ifdef ZMQ_HAVE_POLLER |
49 | |
50 | #include "socket_poller.hpp" |
51 | |
52 | // Macros for repetitive code. |
53 | |
54 | // PROXY_CLEANUP() must not be used before these variables are initialized. |
55 | #define PROXY_CLEANUP() \ |
56 | do { \ |
57 | delete poller_all; \ |
58 | delete poller_in; \ |
59 | delete poller_control; \ |
60 | delete poller_receive_blocked; \ |
61 | delete poller_send_blocked; \ |
62 | delete poller_both_blocked; \ |
63 | delete poller_frontend_only; \ |
64 | delete poller_backend_only; \ |
65 | } while (false) |
66 | |
67 | |
68 | #define CHECK_RC_EXIT_ON_FAILURE() \ |
69 | do { \ |
70 | if (rc < 0) { \ |
71 | PROXY_CLEANUP (); \ |
72 | return close_and_return (&msg, -1); \ |
73 | } \ |
74 | } while (false) |
75 | |
76 | #endif // ZMQ_HAVE_POLLER |
77 | |
78 | |
79 | // Control socket messages |
80 | |
81 | typedef struct |
82 | { |
83 | uint64_t msg_in; |
84 | uint64_t bytes_in; |
85 | uint64_t msg_out; |
86 | uint64_t bytes_out; |
87 | } zmq_socket_stats_t; |
88 | |
89 | |
90 | // Utility functions |
91 | |
92 | int capture (class zmq::socket_base_t *capture_, |
93 | zmq::msg_t *msg_, |
94 | int more_ = 0) |
95 | { |
96 | // Copy message to capture socket if any |
97 | if (capture_) { |
98 | zmq::msg_t ctrl; |
99 | int rc = ctrl.init (); |
100 | if (unlikely (rc < 0)) |
101 | return -1; |
102 | rc = ctrl.copy (*msg_); |
103 | if (unlikely (rc < 0)) |
104 | return -1; |
105 | rc = capture_->send (&ctrl, more_ ? ZMQ_SNDMORE : 0); |
106 | if (unlikely (rc < 0)) |
107 | return -1; |
108 | } |
109 | return 0; |
110 | } |
111 | |
112 | int forward (class zmq::socket_base_t *from_, |
113 | zmq_socket_stats_t *from_stats_, |
114 | class zmq::socket_base_t *to_, |
115 | zmq_socket_stats_t *to_stats_, |
116 | class zmq::socket_base_t *capture_, |
117 | zmq::msg_t *msg_) |
118 | { |
119 | // Forward a burst of messages |
120 | for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) { |
121 | int more; |
122 | size_t moresz; |
123 | size_t complete_msg_size = 0; |
124 | |
125 | // Forward all the parts of one message |
126 | while (true) { |
127 | int rc = from_->recv (msg_, ZMQ_DONTWAIT); |
128 | if (rc < 0) { |
129 | if (likely (errno == EAGAIN && i > 0)) |
130 | return 0; // End of burst |
131 | |
132 | return -1; |
133 | } |
134 | |
135 | complete_msg_size += msg_->size (); |
136 | |
137 | moresz = sizeof more; |
138 | rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz); |
139 | if (unlikely (rc < 0)) |
140 | return -1; |
141 | |
142 | // Copy message to capture socket if any |
143 | rc = capture (capture_, msg_, more); |
144 | if (unlikely (rc < 0)) |
145 | return -1; |
146 | |
147 | rc = to_->send (msg_, more ? ZMQ_SNDMORE : 0); |
148 | if (unlikely (rc < 0)) |
149 | return -1; |
150 | |
151 | if (more == 0) |
152 | break; |
153 | } |
154 | |
155 | // A multipart message counts as 1 packet: |
156 | from_stats_->msg_in++; |
157 | from_stats_->bytes_in += complete_msg_size; |
158 | to_stats_->msg_out++; |
159 | to_stats_->bytes_out += complete_msg_size; |
160 | } |
161 | |
162 | return 0; |
163 | } |
164 | |
165 | static int loop_and_send_multipart_stat (zmq::socket_base_t *control_, |
166 | uint64_t stat_, |
167 | bool first_, |
168 | bool more_) |
169 | { |
170 | int rc; |
171 | zmq::msg_t msg; |
172 | |
173 | // VSM of 8 bytes can't fail to init |
174 | msg.init_size (sizeof (uint64_t)); |
175 | memcpy (msg.data (), &stat_, sizeof (uint64_t)); |
176 | |
177 | // if the first message is handed to the pipe successfully then the HWM |
178 | // is not full, which means failures are due to interrupts (on Windows pipes |
179 | // are TCP sockets), so keep retrying |
180 | do { |
181 | rc = control_->send (&msg, more_ ? ZMQ_SNDMORE : 0); |
182 | } while (!first_ && rc != 0 && errno == EAGAIN); |
183 | |
184 | return rc; |
185 | } |
186 | |
187 | int reply_stats (class zmq::socket_base_t *control_, |
188 | zmq_socket_stats_t *frontend_stats_, |
189 | zmq_socket_stats_t *backend_stats_) |
190 | { |
191 | // first part: frontend stats - the first send might fail due to HWM |
192 | if (loop_and_send_multipart_stat (control_, frontend_stats_->msg_in, true, |
193 | true) |
194 | != 0) |
195 | return -1; |
196 | |
197 | loop_and_send_multipart_stat (control_, frontend_stats_->bytes_in, false, |
198 | true); |
199 | loop_and_send_multipart_stat (control_, frontend_stats_->msg_out, false, |
200 | true); |
201 | loop_and_send_multipart_stat (control_, frontend_stats_->bytes_out, false, |
202 | true); |
203 | |
204 | // second part: backend stats |
205 | loop_and_send_multipart_stat (control_, backend_stats_->msg_in, false, |
206 | true); |
207 | loop_and_send_multipart_stat (control_, backend_stats_->bytes_in, false, |
208 | true); |
209 | loop_and_send_multipart_stat (control_, backend_stats_->msg_out, false, |
210 | true); |
211 | loop_and_send_multipart_stat (control_, backend_stats_->bytes_out, false, |
212 | false); |
213 | |
214 | return 0; |
215 | } |
216 | |
217 | |
218 | #ifdef ZMQ_HAVE_POLLER |
219 | |
220 | int zmq::proxy (class socket_base_t *frontend_, |
221 | class socket_base_t *backend_, |
222 | class socket_base_t *capture_, |
223 | class socket_base_t *control_) |
224 | { |
225 | msg_t msg; |
226 | int rc = msg.init (); |
227 | if (rc != 0) |
228 | return -1; |
229 | |
230 | // The algorithm below assumes ratio of requests and replies processed |
231 | // under full load to be 1:1. |
232 | |
233 | int more; |
234 | size_t moresz = sizeof (more); |
235 | |
236 | // Proxy can be in these three states |
237 | enum |
238 | { |
239 | active, |
240 | paused, |
241 | terminated |
242 | } state = active; |
243 | |
244 | bool frontend_equal_to_backend; |
245 | bool frontend_in = false; |
246 | bool frontend_out = false; |
247 | bool backend_in = false; |
248 | bool backend_out = false; |
249 | bool control_in = false; |
250 | zmq::socket_poller_t::event_t events[3]; |
251 | zmq_socket_stats_t frontend_stats; |
252 | zmq_socket_stats_t backend_stats; |
253 | memset (&frontend_stats, 0, sizeof (frontend_stats)); |
254 | memset (&backend_stats, 0, sizeof (backend_stats)); |
255 | |
256 | // Don't allocate these pollers from stack because they will take more than 900 kB of stack! |
257 | // On Windows this blows up default stack of 1 MB and aborts the program. |
258 | // I wanted to use std::shared_ptr here as the best solution but that requires C++11... |
259 | zmq::socket_poller_t *poller_all = |
260 | new (std::nothrow) zmq::socket_poller_t; // Poll for everything. |
261 | zmq::socket_poller_t *poller_in = new (std::nothrow) zmq:: |
262 | socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop. |
263 | zmq::socket_poller_t *poller_control = new (std::nothrow) zmq:: |
264 | socket_poller_t; // Poll only for 'ZMQ_POLLIN' on 'control_', when proxy is paused. |
265 | zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow) |
266 | zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'. |
267 | |
268 | // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored. |
269 | // In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'. |
270 | // We also don't need 'poller_both_blocked', 'poller_backend_only' nor 'poller_frontend_only' no need to initialize it. |
271 | // We save some RAM and time for initialization. |
272 | zmq::socket_poller_t *poller_send_blocked = |
273 | NULL; // All except 'ZMQ_POLLIN' on 'backend_'. |
274 | zmq::socket_poller_t *poller_both_blocked = |
275 | NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'. |
276 | zmq::socket_poller_t *poller_frontend_only = |
277 | NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'. |
278 | zmq::socket_poller_t *poller_backend_only = |
279 | NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'. |
280 | |
281 | if (frontend_ != backend_) { |
282 | poller_send_blocked = new (std::nothrow) |
283 | zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'. |
284 | poller_both_blocked = new (std::nothrow) zmq:: |
285 | socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'. |
286 | poller_frontend_only = new (std::nothrow) zmq:: |
287 | socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'. |
288 | poller_backend_only = new (std::nothrow) zmq:: |
289 | socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'. |
290 | frontend_equal_to_backend = false; |
291 | } else |
292 | frontend_equal_to_backend = true; |
293 | |
294 | if (poller_all == NULL || poller_in == NULL || poller_control == NULL |
295 | || poller_receive_blocked == NULL |
296 | || ((poller_send_blocked == NULL || poller_both_blocked == NULL) |
297 | && !frontend_equal_to_backend)) { |
298 | PROXY_CLEANUP (); |
299 | return close_and_return (&msg, -1); |
300 | } |
301 | |
302 | zmq::socket_poller_t *poller_wait = |
303 | poller_in; // Poller for blocking wait, initially all 'ZMQ_POLLIN'. |
304 | |
305 | // Register 'frontend_' and 'backend_' with pollers. |
306 | rc = poller_all->add (frontend_, NULL, |
307 | ZMQ_POLLIN | ZMQ_POLLOUT); // Everything. |
308 | CHECK_RC_EXIT_ON_FAILURE (); |
309 | rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's. |
310 | CHECK_RC_EXIT_ON_FAILURE (); |
311 | |
312 | if (frontend_equal_to_backend) { |
313 | // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, |
314 | // so we don't need 'poller_send_blocked'. We need only 'poller_receive_blocked'. |
315 | // We also don't need 'poller_both_blocked', no need to initialize it. |
316 | rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT); |
317 | CHECK_RC_EXIT_ON_FAILURE (); |
318 | } else { |
319 | rc = poller_all->add (backend_, NULL, |
320 | ZMQ_POLLIN | ZMQ_POLLOUT); // Everything. |
321 | CHECK_RC_EXIT_ON_FAILURE (); |
322 | rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's. |
323 | CHECK_RC_EXIT_ON_FAILURE (); |
324 | rc = poller_both_blocked->add ( |
325 | frontend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'. |
326 | CHECK_RC_EXIT_ON_FAILURE (); |
327 | rc = poller_both_blocked->add ( |
328 | backend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'. |
329 | CHECK_RC_EXIT_ON_FAILURE (); |
330 | rc = poller_send_blocked->add ( |
331 | backend_, NULL, |
332 | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'. |
333 | CHECK_RC_EXIT_ON_FAILURE (); |
334 | rc = poller_send_blocked->add ( |
335 | frontend_, NULL, |
336 | ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'. |
337 | CHECK_RC_EXIT_ON_FAILURE (); |
338 | rc = poller_receive_blocked->add ( |
339 | frontend_, NULL, |
340 | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'. |
341 | CHECK_RC_EXIT_ON_FAILURE (); |
342 | rc = poller_receive_blocked->add ( |
343 | backend_, NULL, |
344 | ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'. |
345 | CHECK_RC_EXIT_ON_FAILURE (); |
346 | rc = |
347 | poller_frontend_only->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); |
348 | CHECK_RC_EXIT_ON_FAILURE (); |
349 | rc = |
350 | poller_backend_only->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); |
351 | CHECK_RC_EXIT_ON_FAILURE (); |
352 | } |
353 | |
354 | // Register 'control_' with pollers. |
355 | if (control_ != NULL) { |
356 | rc = poller_all->add (control_, NULL, ZMQ_POLLIN); |
357 | CHECK_RC_EXIT_ON_FAILURE (); |
358 | rc = poller_in->add (control_, NULL, ZMQ_POLLIN); |
359 | CHECK_RC_EXIT_ON_FAILURE (); |
360 | rc = poller_control->add ( |
361 | control_, NULL, |
362 | ZMQ_POLLIN); // When proxy is paused we wait only for ZMQ_POLLIN on 'control_' socket. |
363 | CHECK_RC_EXIT_ON_FAILURE (); |
364 | rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN); |
365 | CHECK_RC_EXIT_ON_FAILURE (); |
366 | if (!frontend_equal_to_backend) { |
367 | rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN); |
368 | CHECK_RC_EXIT_ON_FAILURE (); |
369 | rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN); |
370 | CHECK_RC_EXIT_ON_FAILURE (); |
371 | rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN); |
372 | CHECK_RC_EXIT_ON_FAILURE (); |
373 | rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN); |
374 | CHECK_RC_EXIT_ON_FAILURE (); |
375 | } |
376 | } |
377 | |
378 | bool request_processed, reply_processed; |
379 | |
380 | while (state != terminated) { |
381 | // Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'. |
382 | // If one of receiving end's queue is full ('ZMQ_POLLOUT' not available), |
383 | // 'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'. |
384 | rc = poller_wait->wait (events, 3, -1); |
385 | if (rc < 0 && errno == EAGAIN) |
386 | rc = 0; |
387 | CHECK_RC_EXIT_ON_FAILURE (); |
388 | |
389 | // Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking. |
390 | rc = poller_all->wait (events, 3, 0); |
391 | if (rc < 0 && errno == EAGAIN) |
392 | rc = 0; |
393 | CHECK_RC_EXIT_ON_FAILURE (); |
394 | |
395 | // Process events. |
396 | for (int i = 0; i < rc; i++) { |
397 | if (events[i].socket == frontend_) { |
398 | frontend_in = (events[i].events & ZMQ_POLLIN) != 0; |
399 | frontend_out = (events[i].events & ZMQ_POLLOUT) != 0; |
400 | } else |
401 | // This 'if' needs to be after check for 'frontend_' in order never |
402 | // to be reached in case frontend_==backend_, so we ensure backend_in=false in that case. |
403 | if (events[i].socket == backend_) { |
404 | backend_in = (events[i].events & ZMQ_POLLIN) != 0; |
405 | backend_out = (events[i].events & ZMQ_POLLOUT) != 0; |
406 | } else if (events[i].socket == control_) |
407 | control_in = (events[i].events & ZMQ_POLLIN) != 0; |
408 | } |
409 | |
410 | |
411 | // Process a control command if any. |
412 | if (control_in) { |
413 | rc = control_->recv (&msg, 0); |
414 | CHECK_RC_EXIT_ON_FAILURE (); |
415 | rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); |
416 | if (unlikely (rc < 0) || more) { |
417 | PROXY_CLEANUP (); |
418 | return close_and_return (&msg, -1); |
419 | } |
420 | |
421 | // Copy message to capture socket if any. |
422 | rc = capture (capture_, &msg); |
423 | CHECK_RC_EXIT_ON_FAILURE (); |
424 | |
425 | if (msg.size () == 5 && memcmp (msg.data (), "PAUSE" , 5) == 0) { |
426 | state = paused; |
427 | poller_wait = poller_control; |
428 | } else if (msg.size () == 6 |
429 | && memcmp (msg.data (), "RESUME" , 6) == 0) { |
430 | state = active; |
431 | poller_wait = poller_in; |
432 | } else { |
433 | if (msg.size () == 9 |
434 | && memcmp (msg.data (), "TERMINATE" , 9) == 0) |
435 | state = terminated; |
436 | else { |
437 | if (msg.size () == 10 |
438 | && memcmp (msg.data (), "STATISTICS" , 10) == 0) { |
439 | rc = reply_stats (control_, &frontend_stats, |
440 | &backend_stats); |
441 | CHECK_RC_EXIT_ON_FAILURE (); |
442 | } else { |
443 | // This is an API error, we assert |
444 | puts ("E: invalid command sent to proxy" ); |
445 | zmq_assert (false); |
446 | } |
447 | } |
448 | } |
449 | control_in = false; |
450 | } |
451 | |
452 | if (state == active) { |
453 | // Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'. |
454 | // In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event. |
455 | if (frontend_in && (backend_out || frontend_equal_to_backend)) { |
456 | rc = forward (frontend_, &frontend_stats, backend_, |
457 | &backend_stats, capture_, &msg); |
458 | CHECK_RC_EXIT_ON_FAILURE (); |
459 | request_processed = true; |
460 | frontend_in = backend_out = false; |
461 | } else |
462 | request_processed = false; |
463 | |
464 | // Process a reply, 'ZMQ_POLLIN' on 'backend_' and 'ZMQ_POLLOUT' on 'frontend_'. |
465 | // If 'frontend_' and 'backend_' are the same this is not needed because previous processing |
466 | // covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to |
467 | // design in 'for' event processing loop. |
468 | if (backend_in && frontend_out) { |
469 | rc = forward (backend_, &backend_stats, frontend_, |
470 | &frontend_stats, capture_, &msg); |
471 | CHECK_RC_EXIT_ON_FAILURE (); |
472 | reply_processed = true; |
473 | backend_in = frontend_out = false; |
474 | } else |
475 | reply_processed = false; |
476 | |
477 | if (request_processed || reply_processed) { |
478 | // If request/reply is processed that means we had at least one 'ZMQ_POLLOUT' event. |
479 | // Enable corresponding 'ZMQ_POLLIN' for blocking wait if any was disabled. |
480 | if (poller_wait != poller_in) { |
481 | if (request_processed) { // 'frontend_' -> 'backend_' |
482 | if (poller_wait == poller_both_blocked) |
483 | poller_wait = poller_send_blocked; |
484 | else if (poller_wait == poller_receive_blocked |
485 | || poller_wait == poller_frontend_only) |
486 | poller_wait = poller_in; |
487 | } |
488 | if (reply_processed) { // 'backend_' -> 'frontend_' |
489 | if (poller_wait == poller_both_blocked) |
490 | poller_wait = poller_receive_blocked; |
491 | else if (poller_wait == poller_send_blocked |
492 | || poller_wait == poller_backend_only) |
493 | poller_wait = poller_in; |
494 | } |
495 | } |
496 | } else { |
497 | // No requests have been processed, there were no 'ZMQ_POLLIN' with corresponding 'ZMQ_POLLOUT' events. |
498 | // That means that out queue(s) is/are full or one out queue is full and second one has no messages to process. |
499 | // Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT', |
500 | // or wait only on both 'backend_''s or 'frontend_''s 'ZMQ_POLLIN' and 'ZMQ_POLLOUT'. |
501 | if (frontend_in) { |
502 | if (frontend_out) |
503 | // If frontend_in and frontend_out are true, obviously backend_in and backend_out are both false. |
504 | // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'backend_'. |
505 | // We'll never get here in case of frontend_==backend_ because then frontend_out will always be false. |
506 | poller_wait = poller_backend_only; |
507 | else { |
508 | if (poller_wait == poller_send_blocked) |
509 | poller_wait = poller_both_blocked; |
510 | else if (poller_wait == poller_in) |
511 | poller_wait = poller_receive_blocked; |
512 | } |
513 | } |
514 | if (backend_in) { |
515 | // Will never be reached if frontend_==backend_, 'backend_in' will |
516 | // always be false due to design in 'for' event processing loop. |
517 | if (backend_out) |
518 | // If backend_in and backend_out are true, obviously frontend_in and frontend_out are both false. |
519 | // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'frontend_'. |
520 | poller_wait = poller_frontend_only; |
521 | else { |
522 | if (poller_wait == poller_receive_blocked) |
523 | poller_wait = poller_both_blocked; |
524 | else if (poller_wait == poller_in) |
525 | poller_wait = poller_send_blocked; |
526 | } |
527 | } |
528 | } |
529 | } |
530 | } |
531 | PROXY_CLEANUP (); |
532 | return close_and_return (&msg, 0); |
533 | } |
534 | |
535 | #else // ZMQ_HAVE_POLLER |
536 | |
537 | int zmq::proxy (class socket_base_t *frontend_, |
538 | class socket_base_t *backend_, |
539 | class socket_base_t *capture_, |
540 | class socket_base_t *control_) |
541 | { |
542 | msg_t msg; |
543 | int rc = msg.init (); |
544 | if (rc != 0) |
545 | return -1; |
546 | |
547 | // The algorithm below assumes ratio of requests and replies processed |
548 | // under full load to be 1:1. |
549 | |
550 | int more; |
551 | size_t moresz; |
552 | zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0}, |
553 | {backend_, 0, ZMQ_POLLIN, 0}, |
554 | {control_, 0, ZMQ_POLLIN, 0}}; |
555 | int qt_poll_items = (control_ ? 3 : 2); |
556 | zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0}, |
557 | {backend_, 0, ZMQ_POLLOUT, 0}}; |
558 | |
559 | zmq_socket_stats_t frontend_stats; |
560 | memset (&frontend_stats, 0, sizeof (frontend_stats)); |
561 | zmq_socket_stats_t backend_stats; |
562 | memset (&backend_stats, 0, sizeof (backend_stats)); |
563 | |
564 | // Proxy can be in these three states |
565 | enum |
566 | { |
567 | active, |
568 | paused, |
569 | terminated |
570 | } state = active; |
571 | |
572 | while (state != terminated) { |
573 | // Wait while there are either requests or replies to process. |
574 | rc = zmq_poll (&items[0], qt_poll_items, -1); |
575 | if (unlikely (rc < 0)) |
576 | return close_and_return (&msg, -1); |
577 | |
578 | // Get the pollout separately because when combining this with pollin it maxes the CPU |
579 | // because pollout shall most of the time return directly. |
580 | // POLLOUT is only checked when frontend and backend sockets are not the same. |
581 | if (frontend_ != backend_) { |
582 | rc = zmq_poll (&itemsout[0], 2, 0); |
583 | if (unlikely (rc < 0)) { |
584 | return close_and_return (&msg, -1); |
585 | } |
586 | } |
587 | |
588 | // Process a control command if any |
589 | if (control_ && items[2].revents & ZMQ_POLLIN) { |
590 | rc = control_->recv (&msg, 0); |
591 | if (unlikely (rc < 0)) |
592 | return close_and_return (&msg, -1); |
593 | |
594 | moresz = sizeof more; |
595 | rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); |
596 | if (unlikely (rc < 0) || more) |
597 | return close_and_return (&msg, -1); |
598 | |
599 | // Copy message to capture socket if any |
600 | rc = capture (capture_, &msg); |
601 | if (unlikely (rc < 0)) |
602 | return close_and_return (&msg, -1); |
603 | |
604 | if (msg.size () == 5 && memcmp (msg.data (), "PAUSE" , 5) == 0) |
605 | state = paused; |
606 | else if (msg.size () == 6 && memcmp (msg.data (), "RESUME" , 6) == 0) |
607 | state = active; |
608 | else if (msg.size () == 9 |
609 | && memcmp (msg.data (), "TERMINATE" , 9) == 0) |
610 | state = terminated; |
611 | else { |
612 | if (msg.size () == 10 |
613 | && memcmp (msg.data (), "STATISTICS" , 10) == 0) { |
614 | rc = |
615 | reply_stats (control_, &frontend_stats, &backend_stats); |
616 | if (unlikely (rc < 0)) |
617 | return close_and_return (&msg, -1); |
618 | } else { |
619 | // This is an API error, we assert |
620 | puts ("E: invalid command sent to proxy" ); |
621 | zmq_assert (false); |
622 | } |
623 | } |
624 | } |
625 | // Process a request |
626 | if (state == active && items[0].revents & ZMQ_POLLIN |
627 | && (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) { |
628 | rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, |
629 | capture_, &msg); |
630 | if (unlikely (rc < 0)) |
631 | return close_and_return (&msg, -1); |
632 | } |
633 | // Process a reply |
634 | if (state == active && frontend_ != backend_ |
635 | && items[1].revents & ZMQ_POLLIN |
636 | && itemsout[0].revents & ZMQ_POLLOUT) { |
637 | rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, |
638 | capture_, &msg); |
639 | if (unlikely (rc < 0)) |
640 | return close_and_return (&msg, -1); |
641 | } |
642 | } |
643 | |
644 | return close_and_return (&msg, 0); |
645 | } |
646 | |
647 | #endif // ZMQ_HAVE_POLLER |
648 | |