1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32#include "router.hpp"
33#include "pipe.hpp"
34#include "wire.hpp"
35#include "random.hpp"
36#include "likely.hpp"
37#include "err.hpp"
38
39zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40 routing_socket_base_t (parent_, tid_, sid_),
41 _prefetched (false),
42 _routing_id_sent (false),
43 _current_in (NULL),
44 _terminate_current_in (false),
45 _more_in (false),
46 _current_out (NULL),
47 _more_out (false),
48 _next_integral_routing_id (generate_random ()),
49 _mandatory (false),
50 // raw_socket functionality in ROUTER is deprecated
51 _raw_socket (false),
52 _probe_router (false),
53 _handover (false)
54{
55 options.type = ZMQ_ROUTER;
56 options.recv_routing_id = true;
57 options.raw_socket = false;
58
59 _prefetched_id.init ();
60 _prefetched_msg.init ();
61}
62
63zmq::router_t::~router_t ()
64{
65 zmq_assert (_anonymous_pipes.empty ());
66 _prefetched_id.close ();
67 _prefetched_msg.close ();
68}
69
70void zmq::router_t::xattach_pipe (pipe_t *pipe_,
71 bool subscribe_to_all_,
72 bool locally_initiated_)
73{
74 LIBZMQ_UNUSED (subscribe_to_all_);
75
76 zmq_assert (pipe_);
77
78 if (_probe_router) {
79 msg_t probe_msg;
80 int rc = probe_msg.init ();
81 errno_assert (rc == 0);
82
83 rc = pipe_->write (&probe_msg);
84 // zmq_assert (rc) is not applicable here, since it is not a bug.
85 LIBZMQ_UNUSED (rc);
86
87 pipe_->flush ();
88
89 rc = probe_msg.close ();
90 errno_assert (rc == 0);
91 }
92
93 bool routing_id_ok = identify_peer (pipe_, locally_initiated_);
94 if (routing_id_ok)
95 _fq.attach (pipe_);
96 else
97 _anonymous_pipes.insert (pipe_);
98}
99
100int zmq::router_t::xsetsockopt (int option_,
101 const void *optval_,
102 size_t optvallen_)
103{
104 const bool is_int = (optvallen_ == sizeof (int));
105 int value = 0;
106 if (is_int)
107 memcpy (&value, optval_, sizeof (int));
108
109 switch (option_) {
110 case ZMQ_ROUTER_RAW:
111 if (is_int && value >= 0) {
112 _raw_socket = (value != 0);
113 if (_raw_socket) {
114 options.recv_routing_id = false;
115 options.raw_socket = true;
116 }
117 return 0;
118 }
119 break;
120
121 case ZMQ_ROUTER_MANDATORY:
122 if (is_int && value >= 0) {
123 _mandatory = (value != 0);
124 return 0;
125 }
126 break;
127
128 case ZMQ_PROBE_ROUTER:
129 if (is_int && value >= 0) {
130 _probe_router = (value != 0);
131 return 0;
132 }
133 break;
134
135 case ZMQ_ROUTER_HANDOVER:
136 if (is_int && value >= 0) {
137 _handover = (value != 0);
138 return 0;
139 }
140 break;
141
142#ifdef ZMQ_BUILD_DRAFT_API
143 case ZMQ_ROUTER_NOTIFY:
144 if (is_int && value >= 0
145 && value <= (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT)) {
146 options.router_notify = value;
147 return 0;
148 }
149 break;
150#endif
151
152 default:
153 return routing_socket_base_t::xsetsockopt (option_, optval_,
154 optvallen_);
155 }
156 errno = EINVAL;
157 return -1;
158}
159
160
161void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
162{
163 if (0 == _anonymous_pipes.erase (pipe_)) {
164 erase_out_pipe (pipe_);
165 _fq.pipe_terminated (pipe_);
166 pipe_->rollback ();
167 if (pipe_ == _current_out)
168 _current_out = NULL;
169 }
170}
171
172void zmq::router_t::xread_activated (pipe_t *pipe_)
173{
174 std::set<pipe_t *>::iterator it = _anonymous_pipes.find (pipe_);
175 if (it == _anonymous_pipes.end ())
176 _fq.activated (pipe_);
177 else {
178 const bool routing_id_ok = identify_peer (pipe_, false);
179 if (routing_id_ok) {
180 _anonymous_pipes.erase (it);
181 _fq.attach (pipe_);
182 }
183 }
184}
185
186int zmq::router_t::xsend (msg_t *msg_)
187{
188 // If this is the first part of the message it's the ID of the
189 // peer to send the message to.
190 if (!_more_out) {
191 zmq_assert (!_current_out);
192
193 // If we have malformed message (prefix with no subsequent message)
194 // then just silently ignore it.
195 // TODO: The connections should be killed instead.
196 if (msg_->flags () & msg_t::more) {
197 _more_out = true;
198
199 // Find the pipe associated with the routing id stored in the prefix.
200 // If there's no such pipe just silently ignore the message, unless
201 // router_mandatory is set.
202 out_pipe_t *out_pipe = lookup_out_pipe (
203 blob_t (static_cast<unsigned char *> (msg_->data ()),
204 msg_->size (), zmq::reference_tag_t ()));
205
206 if (out_pipe) {
207 _current_out = out_pipe->pipe;
208
209 // Check whether pipe is closed or not
210 if (!_current_out->check_write ()) {
211 // Check whether pipe is full or not
212 bool pipe_full = !_current_out->check_hwm ();
213 out_pipe->active = false;
214 _current_out = NULL;
215
216 if (_mandatory) {
217 _more_out = false;
218 if (pipe_full)
219 errno = EAGAIN;
220 else
221 errno = EHOSTUNREACH;
222 return -1;
223 }
224 }
225 } else if (_mandatory) {
226 _more_out = false;
227 errno = EHOSTUNREACH;
228 return -1;
229 }
230 }
231
232 int rc = msg_->close ();
233 errno_assert (rc == 0);
234 rc = msg_->init ();
235 errno_assert (rc == 0);
236 return 0;
237 }
238
239 // Ignore the MORE flag for raw-sock or assert?
240 if (options.raw_socket)
241 msg_->reset_flags (msg_t::more);
242
243 // Check whether this is the last part of the message.
244 _more_out = (msg_->flags () & msg_t::more) != 0;
245
246 // Push the message into the pipe. If there's no out pipe, just drop it.
247 if (_current_out) {
248 // Close the remote connection if user has asked to do so
249 // by sending zero length message.
250 // Pending messages in the pipe will be dropped (on receiving term- ack)
251 if (_raw_socket && msg_->size () == 0) {
252 _current_out->terminate (false);
253 int rc = msg_->close ();
254 errno_assert (rc == 0);
255 rc = msg_->init ();
256 errno_assert (rc == 0);
257 _current_out = NULL;
258 return 0;
259 }
260
261 bool ok = _current_out->write (msg_);
262 if (unlikely (!ok)) {
263 // Message failed to send - we must close it ourselves.
264 int rc = msg_->close ();
265 errno_assert (rc == 0);
266 // HWM was checked before, so the pipe must be gone. Roll back
267 // messages that were piped, for example REP labels.
268 _current_out->rollback ();
269 _current_out = NULL;
270 } else {
271 if (!_more_out) {
272 _current_out->flush ();
273 _current_out = NULL;
274 }
275 }
276 } else {
277 int rc = msg_->close ();
278 errno_assert (rc == 0);
279 }
280
281 // Detach the message from the data buffer.
282 int rc = msg_->init ();
283 errno_assert (rc == 0);
284
285 return 0;
286}
287
288int zmq::router_t::xrecv (msg_t *msg_)
289{
290 if (_prefetched) {
291 if (!_routing_id_sent) {
292 int rc = msg_->move (_prefetched_id);
293 errno_assert (rc == 0);
294 _routing_id_sent = true;
295 } else {
296 int rc = msg_->move (_prefetched_msg);
297 errno_assert (rc == 0);
298 _prefetched = false;
299 }
300 _more_in = (msg_->flags () & msg_t::more) != 0;
301
302 if (!_more_in) {
303 if (_terminate_current_in) {
304 _current_in->terminate (true);
305 _terminate_current_in = false;
306 }
307 _current_in = NULL;
308 }
309 return 0;
310 }
311
312 pipe_t *pipe = NULL;
313 int rc = _fq.recvpipe (msg_, &pipe);
314
315 // It's possible that we receive peer's routing id. That happens
316 // after reconnection. The current implementation assumes that
317 // the peer always uses the same routing id.
318 while (rc == 0 && msg_->is_routing_id ())
319 rc = _fq.recvpipe (msg_, &pipe);
320
321 if (rc != 0)
322 return -1;
323
324 zmq_assert (pipe != NULL);
325
326 // If we are in the middle of reading a message, just return the next part.
327 if (_more_in) {
328 _more_in = (msg_->flags () & msg_t::more) != 0;
329
330 if (!_more_in) {
331 if (_terminate_current_in) {
332 _current_in->terminate (true);
333 _terminate_current_in = false;
334 }
335 _current_in = NULL;
336 }
337 } else {
338 // We are at the beginning of a message.
339 // Keep the message part we have in the prefetch buffer
340 // and return the ID of the peer instead.
341 rc = _prefetched_msg.move (*msg_);
342 errno_assert (rc == 0);
343 _prefetched = true;
344 _current_in = pipe;
345
346 const blob_t &routing_id = pipe->get_routing_id ();
347 rc = msg_->init_size (routing_id.size ());
348 errno_assert (rc == 0);
349 memcpy (msg_->data (), routing_id.data (), routing_id.size ());
350 msg_->set_flags (msg_t::more);
351 if (_prefetched_msg.metadata ())
352 msg_->set_metadata (_prefetched_msg.metadata ());
353 _routing_id_sent = true;
354 }
355
356 return 0;
357}
358
359int zmq::router_t::rollback ()
360{
361 if (_current_out) {
362 _current_out->rollback ();
363 _current_out = NULL;
364 _more_out = false;
365 }
366 return 0;
367}
368
369bool zmq::router_t::xhas_in ()
370{
371 // If we are in the middle of reading the messages, there are
372 // definitely more parts available.
373 if (_more_in)
374 return true;
375
376 // We may already have a message pre-fetched.
377 if (_prefetched)
378 return true;
379
380 // Try to read the next message.
381 // The message, if read, is kept in the pre-fetch buffer.
382 pipe_t *pipe = NULL;
383 int rc = _fq.recvpipe (&_prefetched_msg, &pipe);
384
385 // It's possible that we receive peer's routing id. That happens
386 // after reconnection. The current implementation assumes that
387 // the peer always uses the same routing id.
388 // TODO: handle the situation when the peer changes its routing id.
389 while (rc == 0 && _prefetched_msg.is_routing_id ())
390 rc = _fq.recvpipe (&_prefetched_msg, &pipe);
391
392 if (rc != 0)
393 return false;
394
395 zmq_assert (pipe != NULL);
396
397 const blob_t &routing_id = pipe->get_routing_id ();
398 rc = _prefetched_id.init_size (routing_id.size ());
399 errno_assert (rc == 0);
400 memcpy (_prefetched_id.data (), routing_id.data (), routing_id.size ());
401 _prefetched_id.set_flags (msg_t::more);
402
403 _prefetched = true;
404 _routing_id_sent = false;
405 _current_in = pipe;
406
407 return true;
408}
409
410static bool check_pipe_hwm (const zmq::pipe_t &pipe_)
411{
412 return pipe_.check_hwm ();
413}
414
415bool zmq::router_t::xhas_out ()
416{
417 // In theory, ROUTER socket is always ready for writing (except when
418 // MANDATORY is set). Whether actual attempt to write succeeds depends
419 // on whitch pipe the message is going to be routed to.
420
421 if (!_mandatory)
422 return true;
423
424 return any_of_out_pipes (check_pipe_hwm);
425}
426
427int zmq::router_t::get_peer_state (const void *routing_id_,
428 size_t routing_id_size_) const
429{
430 int res = 0;
431
432 // TODO remove the const_cast, see comment in lookup_out_pipe
433 const blob_t routing_id_blob (
434 static_cast<unsigned char *> (const_cast<void *> (routing_id_)),
435 routing_id_size_);
436 const out_pipe_t *out_pipe = lookup_out_pipe (routing_id_blob);
437 if (!out_pipe) {
438 errno = EHOSTUNREACH;
439 return -1;
440 }
441
442 if (out_pipe->pipe->check_hwm ())
443 res |= ZMQ_POLLOUT;
444
445 /** \todo does it make any sense to check the inpipe as well? */
446
447 return res;
448}
449
450bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_)
451{
452 msg_t msg;
453 blob_t routing_id;
454
455 if (locally_initiated_ && connect_routing_id_is_set ()) {
456 const std::string connect_routing_id = extract_connect_routing_id ();
457 routing_id.set (
458 reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
459 connect_routing_id.length ());
460 // Not allowed to duplicate an existing rid
461 zmq_assert (!has_out_pipe (routing_id));
462 } else if (
463 options
464 .raw_socket) { // Always assign an integral routing id for raw-socket
465 unsigned char buf[5];
466 buf[0] = 0;
467 put_uint32 (buf + 1, _next_integral_routing_id++);
468 routing_id.set (buf, sizeof buf);
469 } else if (!options.raw_socket) {
470 // Pick up handshake cases and also case where next integral routing id is set
471 msg.init ();
472 bool ok = pipe_->read (&msg);
473 if (!ok)
474 return false;
475
476 if (msg.size () == 0) {
477 // Fall back on the auto-generation
478 unsigned char buf[5];
479 buf[0] = 0;
480 put_uint32 (buf + 1, _next_integral_routing_id++);
481 routing_id.set (buf, sizeof buf);
482 msg.close ();
483 } else {
484 routing_id.set (static_cast<unsigned char *> (msg.data ()),
485 msg.size ());
486 msg.close ();
487
488 // Try to remove an existing routing id entry to allow the new
489 // connection to take the routing id.
490 out_pipe_t *existing_outpipe = lookup_out_pipe (routing_id);
491
492 if (existing_outpipe) {
493 if (!_handover)
494 // Ignore peers with duplicate ID
495 return false;
496
497 // We will allow the new connection to take over this
498 // routing id. Temporarily assign a new routing id to the
499 // existing pipe so we can terminate it asynchronously.
500 unsigned char buf[5];
501 buf[0] = 0;
502 put_uint32 (buf + 1, _next_integral_routing_id++);
503 blob_t new_routing_id (buf, sizeof buf);
504
505 pipe_t *const old_pipe = existing_outpipe->pipe;
506
507 erase_out_pipe (old_pipe);
508 old_pipe->set_router_socket_routing_id (new_routing_id);
509 add_out_pipe (ZMQ_MOVE (new_routing_id), old_pipe);
510
511 if (old_pipe == _current_in)
512 _terminate_current_in = true;
513 else
514 old_pipe->terminate (true);
515 }
516 }
517 }
518
519 pipe_->set_router_socket_routing_id (routing_id);
520 add_out_pipe (ZMQ_MOVE (routing_id), pipe_);
521
522 return true;
523}
524