1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | #include "macros.hpp" |
32 | #include "router.hpp" |
33 | #include "pipe.hpp" |
34 | #include "wire.hpp" |
35 | #include "random.hpp" |
36 | #include "likely.hpp" |
37 | #include "err.hpp" |
38 | |
39 | zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : |
40 | routing_socket_base_t (parent_, tid_, sid_), |
41 | _prefetched (false), |
42 | _routing_id_sent (false), |
43 | _current_in (NULL), |
44 | _terminate_current_in (false), |
45 | _more_in (false), |
46 | _current_out (NULL), |
47 | _more_out (false), |
48 | _next_integral_routing_id (generate_random ()), |
49 | _mandatory (false), |
50 | // raw_socket functionality in ROUTER is deprecated |
51 | _raw_socket (false), |
52 | _probe_router (false), |
53 | _handover (false) |
54 | { |
55 | options.type = ZMQ_ROUTER; |
56 | options.recv_routing_id = true; |
57 | options.raw_socket = false; |
58 | |
59 | _prefetched_id.init (); |
60 | _prefetched_msg.init (); |
61 | } |
62 | |
63 | zmq::router_t::~router_t () |
64 | { |
65 | zmq_assert (_anonymous_pipes.empty ()); |
66 | _prefetched_id.close (); |
67 | _prefetched_msg.close (); |
68 | } |
69 | |
70 | void zmq::router_t::xattach_pipe (pipe_t *pipe_, |
71 | bool subscribe_to_all_, |
72 | bool locally_initiated_) |
73 | { |
74 | LIBZMQ_UNUSED (subscribe_to_all_); |
75 | |
76 | zmq_assert (pipe_); |
77 | |
78 | if (_probe_router) { |
79 | msg_t probe_msg; |
80 | int rc = probe_msg.init (); |
81 | errno_assert (rc == 0); |
82 | |
83 | rc = pipe_->write (&probe_msg); |
84 | // zmq_assert (rc) is not applicable here, since it is not a bug. |
85 | LIBZMQ_UNUSED (rc); |
86 | |
87 | pipe_->flush (); |
88 | |
89 | rc = probe_msg.close (); |
90 | errno_assert (rc == 0); |
91 | } |
92 | |
93 | bool routing_id_ok = identify_peer (pipe_, locally_initiated_); |
94 | if (routing_id_ok) |
95 | _fq.attach (pipe_); |
96 | else |
97 | _anonymous_pipes.insert (pipe_); |
98 | } |
99 | |
100 | int zmq::router_t::xsetsockopt (int option_, |
101 | const void *optval_, |
102 | size_t optvallen_) |
103 | { |
104 | const bool is_int = (optvallen_ == sizeof (int)); |
105 | int value = 0; |
106 | if (is_int) |
107 | memcpy (&value, optval_, sizeof (int)); |
108 | |
109 | switch (option_) { |
110 | case ZMQ_ROUTER_RAW: |
111 | if (is_int && value >= 0) { |
112 | _raw_socket = (value != 0); |
113 | if (_raw_socket) { |
114 | options.recv_routing_id = false; |
115 | options.raw_socket = true; |
116 | } |
117 | return 0; |
118 | } |
119 | break; |
120 | |
121 | case ZMQ_ROUTER_MANDATORY: |
122 | if (is_int && value >= 0) { |
123 | _mandatory = (value != 0); |
124 | return 0; |
125 | } |
126 | break; |
127 | |
128 | case ZMQ_PROBE_ROUTER: |
129 | if (is_int && value >= 0) { |
130 | _probe_router = (value != 0); |
131 | return 0; |
132 | } |
133 | break; |
134 | |
135 | case ZMQ_ROUTER_HANDOVER: |
136 | if (is_int && value >= 0) { |
137 | _handover = (value != 0); |
138 | return 0; |
139 | } |
140 | break; |
141 | |
142 | #ifdef ZMQ_BUILD_DRAFT_API |
143 | case ZMQ_ROUTER_NOTIFY: |
144 | if (is_int && value >= 0 |
145 | && value <= (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT)) { |
146 | options.router_notify = value; |
147 | return 0; |
148 | } |
149 | break; |
150 | #endif |
151 | |
152 | default: |
153 | return routing_socket_base_t::xsetsockopt (option_, optval_, |
154 | optvallen_); |
155 | } |
156 | errno = EINVAL; |
157 | return -1; |
158 | } |
159 | |
160 | |
161 | void zmq::router_t::xpipe_terminated (pipe_t *pipe_) |
162 | { |
163 | if (0 == _anonymous_pipes.erase (pipe_)) { |
164 | erase_out_pipe (pipe_); |
165 | _fq.pipe_terminated (pipe_); |
166 | pipe_->rollback (); |
167 | if (pipe_ == _current_out) |
168 | _current_out = NULL; |
169 | } |
170 | } |
171 | |
172 | void zmq::router_t::xread_activated (pipe_t *pipe_) |
173 | { |
174 | std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_); |
175 | if (it == _anonymous_pipes.end ()) |
176 | _fq.activated (pipe_); |
177 | else { |
178 | const bool routing_id_ok = identify_peer (pipe_, false); |
179 | if (routing_id_ok) { |
180 | _anonymous_pipes.erase (it); |
181 | _fq.attach (pipe_); |
182 | } |
183 | } |
184 | } |
185 | |
186 | int zmq::router_t::xsend (msg_t *msg_) |
187 | { |
188 | // If this is the first part of the message it's the ID of the |
189 | // peer to send the message to. |
190 | if (!_more_out) { |
191 | zmq_assert (!_current_out); |
192 | |
193 | // If we have malformed message (prefix with no subsequent message) |
194 | // then just silently ignore it. |
195 | // TODO: The connections should be killed instead. |
196 | if (msg_->flags () & msg_t::more) { |
197 | _more_out = true; |
198 | |
199 | // Find the pipe associated with the routing id stored in the prefix. |
200 | // If there's no such pipe just silently ignore the message, unless |
201 | // router_mandatory is set. |
202 | out_pipe_t *out_pipe = lookup_out_pipe ( |
203 | blob_t (static_cast<unsigned char *> (msg_->data ()), |
204 | msg_->size (), zmq::reference_tag_t ())); |
205 | |
206 | if (out_pipe) { |
207 | _current_out = out_pipe->pipe; |
208 | |
209 | // Check whether pipe is closed or not |
210 | if (!_current_out->check_write ()) { |
211 | // Check whether pipe is full or not |
212 | bool pipe_full = !_current_out->check_hwm (); |
213 | out_pipe->active = false; |
214 | _current_out = NULL; |
215 | |
216 | if (_mandatory) { |
217 | _more_out = false; |
218 | if (pipe_full) |
219 | errno = EAGAIN; |
220 | else |
221 | errno = EHOSTUNREACH; |
222 | return -1; |
223 | } |
224 | } |
225 | } else if (_mandatory) { |
226 | _more_out = false; |
227 | errno = EHOSTUNREACH; |
228 | return -1; |
229 | } |
230 | } |
231 | |
232 | int rc = msg_->close (); |
233 | errno_assert (rc == 0); |
234 | rc = msg_->init (); |
235 | errno_assert (rc == 0); |
236 | return 0; |
237 | } |
238 | |
239 | // Ignore the MORE flag for raw-sock or assert? |
240 | if (options.raw_socket) |
241 | msg_->reset_flags (msg_t::more); |
242 | |
243 | // Check whether this is the last part of the message. |
244 | _more_out = (msg_->flags () & msg_t::more) != 0; |
245 | |
246 | // Push the message into the pipe. If there's no out pipe, just drop it. |
247 | if (_current_out) { |
248 | // Close the remote connection if user has asked to do so |
249 | // by sending zero length message. |
250 | // Pending messages in the pipe will be dropped (on receiving term- ack) |
251 | if (_raw_socket && msg_->size () == 0) { |
252 | _current_out->terminate (false); |
253 | int rc = msg_->close (); |
254 | errno_assert (rc == 0); |
255 | rc = msg_->init (); |
256 | errno_assert (rc == 0); |
257 | _current_out = NULL; |
258 | return 0; |
259 | } |
260 | |
261 | bool ok = _current_out->write (msg_); |
262 | if (unlikely (!ok)) { |
263 | // Message failed to send - we must close it ourselves. |
264 | int rc = msg_->close (); |
265 | errno_assert (rc == 0); |
266 | // HWM was checked before, so the pipe must be gone. Roll back |
267 | // messages that were piped, for example REP labels. |
268 | _current_out->rollback (); |
269 | _current_out = NULL; |
270 | } else { |
271 | if (!_more_out) { |
272 | _current_out->flush (); |
273 | _current_out = NULL; |
274 | } |
275 | } |
276 | } else { |
277 | int rc = msg_->close (); |
278 | errno_assert (rc == 0); |
279 | } |
280 | |
281 | // Detach the message from the data buffer. |
282 | int rc = msg_->init (); |
283 | errno_assert (rc == 0); |
284 | |
285 | return 0; |
286 | } |
287 | |
288 | int zmq::router_t::xrecv (msg_t *msg_) |
289 | { |
290 | if (_prefetched) { |
291 | if (!_routing_id_sent) { |
292 | int rc = msg_->move (_prefetched_id); |
293 | errno_assert (rc == 0); |
294 | _routing_id_sent = true; |
295 | } else { |
296 | int rc = msg_->move (_prefetched_msg); |
297 | errno_assert (rc == 0); |
298 | _prefetched = false; |
299 | } |
300 | _more_in = (msg_->flags () & msg_t::more) != 0; |
301 | |
302 | if (!_more_in) { |
303 | if (_terminate_current_in) { |
304 | _current_in->terminate (true); |
305 | _terminate_current_in = false; |
306 | } |
307 | _current_in = NULL; |
308 | } |
309 | return 0; |
310 | } |
311 | |
312 | pipe_t *pipe = NULL; |
313 | int rc = _fq.recvpipe (msg_, &pipe); |
314 | |
315 | // It's possible that we receive peer's routing id. That happens |
316 | // after reconnection. The current implementation assumes that |
317 | // the peer always uses the same routing id. |
318 | while (rc == 0 && msg_->is_routing_id ()) |
319 | rc = _fq.recvpipe (msg_, &pipe); |
320 | |
321 | if (rc != 0) |
322 | return -1; |
323 | |
324 | zmq_assert (pipe != NULL); |
325 | |
326 | // If we are in the middle of reading a message, just return the next part. |
327 | if (_more_in) { |
328 | _more_in = (msg_->flags () & msg_t::more) != 0; |
329 | |
330 | if (!_more_in) { |
331 | if (_terminate_current_in) { |
332 | _current_in->terminate (true); |
333 | _terminate_current_in = false; |
334 | } |
335 | _current_in = NULL; |
336 | } |
337 | } else { |
338 | // We are at the beginning of a message. |
339 | // Keep the message part we have in the prefetch buffer |
340 | // and return the ID of the peer instead. |
341 | rc = _prefetched_msg.move (*msg_); |
342 | errno_assert (rc == 0); |
343 | _prefetched = true; |
344 | _current_in = pipe; |
345 | |
346 | const blob_t &routing_id = pipe->get_routing_id (); |
347 | rc = msg_->init_size (routing_id.size ()); |
348 | errno_assert (rc == 0); |
349 | memcpy (msg_->data (), routing_id.data (), routing_id.size ()); |
350 | msg_->set_flags (msg_t::more); |
351 | if (_prefetched_msg.metadata ()) |
352 | msg_->set_metadata (_prefetched_msg.metadata ()); |
353 | _routing_id_sent = true; |
354 | } |
355 | |
356 | return 0; |
357 | } |
358 | |
359 | int zmq::router_t::rollback () |
360 | { |
361 | if (_current_out) { |
362 | _current_out->rollback (); |
363 | _current_out = NULL; |
364 | _more_out = false; |
365 | } |
366 | return 0; |
367 | } |
368 | |
369 | bool zmq::router_t::xhas_in () |
370 | { |
371 | // If we are in the middle of reading the messages, there are |
372 | // definitely more parts available. |
373 | if (_more_in) |
374 | return true; |
375 | |
376 | // We may already have a message pre-fetched. |
377 | if (_prefetched) |
378 | return true; |
379 | |
380 | // Try to read the next message. |
381 | // The message, if read, is kept in the pre-fetch buffer. |
382 | pipe_t *pipe = NULL; |
383 | int rc = _fq.recvpipe (&_prefetched_msg, &pipe); |
384 | |
385 | // It's possible that we receive peer's routing id. That happens |
386 | // after reconnection. The current implementation assumes that |
387 | // the peer always uses the same routing id. |
388 | // TODO: handle the situation when the peer changes its routing id. |
389 | while (rc == 0 && _prefetched_msg.is_routing_id ()) |
390 | rc = _fq.recvpipe (&_prefetched_msg, &pipe); |
391 | |
392 | if (rc != 0) |
393 | return false; |
394 | |
395 | zmq_assert (pipe != NULL); |
396 | |
397 | const blob_t &routing_id = pipe->get_routing_id (); |
398 | rc = _prefetched_id.init_size (routing_id.size ()); |
399 | errno_assert (rc == 0); |
400 | memcpy (_prefetched_id.data (), routing_id.data (), routing_id.size ()); |
401 | _prefetched_id.set_flags (msg_t::more); |
402 | |
403 | _prefetched = true; |
404 | _routing_id_sent = false; |
405 | _current_in = pipe; |
406 | |
407 | return true; |
408 | } |
409 | |
410 | static bool check_pipe_hwm (const zmq::pipe_t &pipe_) |
411 | { |
412 | return pipe_.check_hwm (); |
413 | } |
414 | |
415 | bool zmq::router_t::xhas_out () |
416 | { |
417 | // In theory, ROUTER socket is always ready for writing (except when |
418 | // MANDATORY is set). Whether actual attempt to write succeeds depends |
419 | // on whitch pipe the message is going to be routed to. |
420 | |
421 | if (!_mandatory) |
422 | return true; |
423 | |
424 | return any_of_out_pipes (check_pipe_hwm); |
425 | } |
426 | |
427 | int zmq::router_t::get_peer_state (const void *routing_id_, |
428 | size_t routing_id_size_) const |
429 | { |
430 | int res = 0; |
431 | |
432 | // TODO remove the const_cast, see comment in lookup_out_pipe |
433 | const blob_t routing_id_blob ( |
434 | static_cast<unsigned char *> (const_cast<void *> (routing_id_)), |
435 | routing_id_size_); |
436 | const out_pipe_t *out_pipe = lookup_out_pipe (routing_id_blob); |
437 | if (!out_pipe) { |
438 | errno = EHOSTUNREACH; |
439 | return -1; |
440 | } |
441 | |
442 | if (out_pipe->pipe->check_hwm ()) |
443 | res |= ZMQ_POLLOUT; |
444 | |
445 | /** \todo does it make any sense to check the inpipe as well? */ |
446 | |
447 | return res; |
448 | } |
449 | |
450 | bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_) |
451 | { |
452 | msg_t msg; |
453 | blob_t routing_id; |
454 | |
455 | if (locally_initiated_ && connect_routing_id_is_set ()) { |
456 | const std::string connect_routing_id = extract_connect_routing_id (); |
457 | routing_id.set ( |
458 | reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()), |
459 | connect_routing_id.length ()); |
460 | // Not allowed to duplicate an existing rid |
461 | zmq_assert (!has_out_pipe (routing_id)); |
462 | } else if ( |
463 | options |
464 | .raw_socket) { // Always assign an integral routing id for raw-socket |
465 | unsigned char buf[5]; |
466 | buf[0] = 0; |
467 | put_uint32 (buf + 1, _next_integral_routing_id++); |
468 | routing_id.set (buf, sizeof buf); |
469 | } else if (!options.raw_socket) { |
470 | // Pick up handshake cases and also case where next integral routing id is set |
471 | msg.init (); |
472 | bool ok = pipe_->read (&msg); |
473 | if (!ok) |
474 | return false; |
475 | |
476 | if (msg.size () == 0) { |
477 | // Fall back on the auto-generation |
478 | unsigned char buf[5]; |
479 | buf[0] = 0; |
480 | put_uint32 (buf + 1, _next_integral_routing_id++); |
481 | routing_id.set (buf, sizeof buf); |
482 | msg.close (); |
483 | } else { |
484 | routing_id.set (static_cast<unsigned char *> (msg.data ()), |
485 | msg.size ()); |
486 | msg.close (); |
487 | |
488 | // Try to remove an existing routing id entry to allow the new |
489 | // connection to take the routing id. |
490 | out_pipe_t *existing_outpipe = lookup_out_pipe (routing_id); |
491 | |
492 | if (existing_outpipe) { |
493 | if (!_handover) |
494 | // Ignore peers with duplicate ID |
495 | return false; |
496 | |
497 | // We will allow the new connection to take over this |
498 | // routing id. Temporarily assign a new routing id to the |
499 | // existing pipe so we can terminate it asynchronously. |
500 | unsigned char buf[5]; |
501 | buf[0] = 0; |
502 | put_uint32 (buf + 1, _next_integral_routing_id++); |
503 | blob_t new_routing_id (buf, sizeof buf); |
504 | |
505 | pipe_t *const old_pipe = existing_outpipe->pipe; |
506 | |
507 | erase_out_pipe (old_pipe); |
508 | old_pipe->set_router_socket_routing_id (new_routing_id); |
509 | add_out_pipe (ZMQ_MOVE (new_routing_id), old_pipe); |
510 | |
511 | if (old_pipe == _current_in) |
512 | _terminate_current_in = true; |
513 | else |
514 | old_pipe->terminate (true); |
515 | } |
516 | } |
517 | } |
518 | |
519 | pipe_->set_router_socket_routing_id (routing_id); |
520 | add_out_pipe (ZMQ_MOVE (routing_id), pipe_); |
521 | |
522 | return true; |
523 | } |
524 | |