1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include <new>
32#include <string>
33#include <algorithm>
34#include <limits>
35
36#include "macros.hpp"
37
38#if defined ZMQ_HAVE_WINDOWS
39#if defined _MSC_VER
40#if defined _WIN32_WCE
41#include <cmnintrin.h>
42#else
43#include <intrin.h>
44#endif
45#endif
46#else
47#include <unistd.h>
48#include <ctype.h>
49#endif
50
51#include "socket_base.hpp"
52#include "tcp_listener.hpp"
53#include "ws_listener.hpp"
54#include "ipc_listener.hpp"
55#include "tipc_listener.hpp"
56#include "tcp_connecter.hpp"
57#include "ws_address.hpp"
58#include "wss_address.hpp"
59#include "io_thread.hpp"
60#include "session_base.hpp"
61#include "config.hpp"
62#include "pipe.hpp"
63#include "err.hpp"
64#include "ctx.hpp"
65#include "likely.hpp"
66#include "msg.hpp"
67#include "address.hpp"
68#include "ipc_address.hpp"
69#include "tcp_address.hpp"
70#include "udp_address.hpp"
71#include "tipc_address.hpp"
72#include "mailbox.hpp"
73#include "mailbox_safe.hpp"
74
75#if defined ZMQ_HAVE_VMCI
76#include "vmci_address.hpp"
77#include "vmci_listener.hpp"
78#endif
79
80#ifdef ZMQ_HAVE_OPENPGM
81#include "pgm_socket.hpp"
82#endif
83
84#include "pair.hpp"
85#include "pub.hpp"
86#include "sub.hpp"
87#include "req.hpp"
88#include "rep.hpp"
89#include "pull.hpp"
90#include "push.hpp"
91#include "dealer.hpp"
92#include "router.hpp"
93#include "xpub.hpp"
94#include "xsub.hpp"
95#include "stream.hpp"
96#include "server.hpp"
97#include "client.hpp"
98#include "radio.hpp"
99#include "dish.hpp"
100#include "gather.hpp"
101#include "scatter.hpp"
102#include "dgram.hpp"
103
104void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
105 pipe_t *pipe_)
106{
107 _inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_), pipe_);
108}
109
110int zmq::socket_base_t::inprocs_t::erase_pipes (
111 const std::string &endpoint_uri_str_)
112{
113 const std::pair<map_t::iterator, map_t::iterator> range =
114 _inprocs.equal_range (endpoint_uri_str_);
115 if (range.first == range.second) {
116 errno = ENOENT;
117 return -1;
118 }
119
120 for (map_t::iterator it = range.first; it != range.second; ++it)
121 it->second->terminate (true);
122 _inprocs.erase (range.first, range.second);
123 return 0;
124}
125
126void zmq::socket_base_t::inprocs_t::erase_pipe (pipe_t *pipe_)
127{
128 for (map_t::iterator it = _inprocs.begin (), end = _inprocs.end ();
129 it != end; ++it)
130 if (it->second == pipe_) {
131 _inprocs.erase (it);
132 break;
133 }
134}
135
136bool zmq::socket_base_t::check_tag () const
137{
138 return _tag == 0xbaddecaf;
139}
140
141bool zmq::socket_base_t::is_thread_safe () const
142{
143 return _thread_safe;
144}
145
146zmq::socket_base_t *zmq::socket_base_t::create (int type_,
147 class ctx_t *parent_,
148 uint32_t tid_,
149 int sid_)
150{
151 socket_base_t *s = NULL;
152 switch (type_) {
153 case ZMQ_PAIR:
154 s = new (std::nothrow) pair_t (parent_, tid_, sid_);
155 break;
156 case ZMQ_PUB:
157 s = new (std::nothrow) pub_t (parent_, tid_, sid_);
158 break;
159 case ZMQ_SUB:
160 s = new (std::nothrow) sub_t (parent_, tid_, sid_);
161 break;
162 case ZMQ_REQ:
163 s = new (std::nothrow) req_t (parent_, tid_, sid_);
164 break;
165 case ZMQ_REP:
166 s = new (std::nothrow) rep_t (parent_, tid_, sid_);
167 break;
168 case ZMQ_DEALER:
169 s = new (std::nothrow) dealer_t (parent_, tid_, sid_);
170 break;
171 case ZMQ_ROUTER:
172 s = new (std::nothrow) router_t (parent_, tid_, sid_);
173 break;
174 case ZMQ_PULL:
175 s = new (std::nothrow) pull_t (parent_, tid_, sid_);
176 break;
177 case ZMQ_PUSH:
178 s = new (std::nothrow) push_t (parent_, tid_, sid_);
179 break;
180 case ZMQ_XPUB:
181 s = new (std::nothrow) xpub_t (parent_, tid_, sid_);
182 break;
183 case ZMQ_XSUB:
184 s = new (std::nothrow) xsub_t (parent_, tid_, sid_);
185 break;
186 case ZMQ_STREAM:
187 s = new (std::nothrow) stream_t (parent_, tid_, sid_);
188 break;
189 case ZMQ_SERVER:
190 s = new (std::nothrow) server_t (parent_, tid_, sid_);
191 break;
192 case ZMQ_CLIENT:
193 s = new (std::nothrow) client_t (parent_, tid_, sid_);
194 break;
195 case ZMQ_RADIO:
196 s = new (std::nothrow) radio_t (parent_, tid_, sid_);
197 break;
198 case ZMQ_DISH:
199 s = new (std::nothrow) dish_t (parent_, tid_, sid_);
200 break;
201 case ZMQ_GATHER:
202 s = new (std::nothrow) gather_t (parent_, tid_, sid_);
203 break;
204 case ZMQ_SCATTER:
205 s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
206 break;
207 case ZMQ_DGRAM:
208 s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
209 break;
210 default:
211 errno = EINVAL;
212 return NULL;
213 }
214
215 alloc_assert (s);
216
217 if (s->_mailbox == NULL) {
218 s->_destroyed = true;
219 LIBZMQ_DELETE (s);
220 return NULL;
221 }
222
223 return s;
224}
225
226zmq::socket_base_t::socket_base_t (ctx_t *parent_,
227 uint32_t tid_,
228 int sid_,
229 bool thread_safe_) :
230 own_t (parent_, tid_),
231 _tag (0xbaddecaf),
232 _ctx_terminated (false),
233 _destroyed (false),
234 _poller (NULL),
235 _handle (static_cast<poller_t::handle_t> (NULL)),
236 _last_tsc (0),
237 _ticks (0),
238 _rcvmore (false),
239 _monitor_socket (NULL),
240 _monitor_events (0),
241 _thread_safe (thread_safe_),
242 _reaper_signaler (NULL),
243 _sync (),
244 _monitor_sync ()
245{
246 options.socket_id = sid_;
247 options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
248 options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
249 options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV) != 0;
250
251 if (_thread_safe) {
252 _mailbox = new (std::nothrow) mailbox_safe_t (&_sync);
253 zmq_assert (_mailbox);
254 } else {
255 mailbox_t *m = new (std::nothrow) mailbox_t ();
256 zmq_assert (m);
257
258 if (m->get_fd () != retired_fd)
259 _mailbox = m;
260 else {
261 LIBZMQ_DELETE (m);
262 _mailbox = NULL;
263 }
264 }
265}
266
267int zmq::socket_base_t::get_peer_state (const void *routing_id_,
268 size_t routing_id_size_) const
269{
270 LIBZMQ_UNUSED (routing_id_);
271 LIBZMQ_UNUSED (routing_id_size_);
272
273 // Only ROUTER sockets support this
274 errno = ENOTSUP;
275 return -1;
276}
277
278zmq::socket_base_t::~socket_base_t ()
279{
280 if (_mailbox)
281 LIBZMQ_DELETE (_mailbox);
282
283 if (_reaper_signaler)
284 LIBZMQ_DELETE (_reaper_signaler);
285
286 scoped_lock_t lock (_monitor_sync);
287 stop_monitor ();
288
289 zmq_assert (_destroyed);
290}
291
292zmq::i_mailbox *zmq::socket_base_t::get_mailbox () const
293{
294 return _mailbox;
295}
296
297void zmq::socket_base_t::stop ()
298{
299 // Called by ctx when it is terminated (zmq_ctx_term).
300 // 'stop' command is sent from the threads that called zmq_ctx_term to
301 // the thread owning the socket. This way, blocking call in the
302 // owner thread can be interrupted.
303 send_stop ();
304}
305
306// TODO consider renaming protocol_ to scheme_ in conformance with RFC 3986
307// terminology, but this requires extensive changes to be consistent
308int zmq::socket_base_t::parse_uri (const char *uri_,
309 std::string &protocol_,
310 std::string &path_)
311{
312 zmq_assert (uri_ != NULL);
313
314 std::string uri (uri_);
315 const std::string::size_type pos = uri.find ("://");
316 if (pos == std::string::npos) {
317 errno = EINVAL;
318 return -1;
319 }
320 protocol_ = uri.substr (0, pos);
321 path_ = uri.substr (pos + 3);
322
323 if (protocol_.empty () || path_.empty ()) {
324 errno = EINVAL;
325 return -1;
326 }
327 return 0;
328}
329
330int zmq::socket_base_t::check_protocol (const std::string &protocol_) const
331{
332 // First check out whether the protocol is something we are aware of.
333 if (protocol_ != protocol_name::inproc
334#if defined ZMQ_HAVE_IPC
335 && protocol_ != protocol_name::ipc
336#endif
337 && protocol_ != protocol_name::tcp
338#ifdef ZMQ_HAVE_WS
339 && protocol_ != protocol_name::ws
340#endif
341#ifdef ZMQ_HAVE_WSS
342 && protocol_ != protocol_name::wss
343#endif
344#if defined ZMQ_HAVE_OPENPGM
345 // pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
346 && protocol_ != "pgm"
347 && protocol_ != "epgm"
348#endif
349#if defined ZMQ_HAVE_TIPC
350 // TIPC transport is only available on Linux.
351 && protocol_ != protocol_name::tipc
352#endif
353#if defined ZMQ_HAVE_NORM
354 && protocol_ != "norm"
355#endif
356#if defined ZMQ_HAVE_VMCI
357 && protocol_ != protocol_name::vmci
358#endif
359 && protocol_ != protocol_name::udp) {
360 errno = EPROTONOSUPPORT;
361 return -1;
362 }
363
364 // Check whether socket type and transport protocol match.
365 // Specifically, multicast protocols can't be combined with
366 // bi-directional messaging patterns (socket types).
367#if defined ZMQ_HAVE_OPENPGM || defined ZMQ_HAVE_NORM
368 if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm")
369 && options.type != ZMQ_PUB && options.type != ZMQ_SUB
370 && options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) {
371 errno = ENOCOMPATPROTO;
372 return -1;
373 }
374#endif
375
376 if (protocol_ == protocol_name::udp
377 && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO
378 && options.type != ZMQ_DGRAM)) {
379 errno = ENOCOMPATPROTO;
380 return -1;
381 }
382
383 // Protocol is available.
384 return 0;
385}
386
387void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
388 bool subscribe_to_all_,
389 bool locally_initiated_)
390{
391 // First, register the pipe so that we can terminate it later on.
392 pipe_->set_event_sink (this);
393 _pipes.push_back (pipe_);
394
395 // Let the derived socket type know about new pipe.
396 xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
397
398 // If the socket is already being closed, ask any new pipes to terminate
399 // straight away.
400 if (is_terminating ()) {
401 register_term_acks (1);
402 pipe_->terminate (false);
403 }
404}
405
406int zmq::socket_base_t::setsockopt (int option_,
407 const void *optval_,
408 size_t optvallen_)
409{
410 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
411
412 if (unlikely (_ctx_terminated)) {
413 errno = ETERM;
414 return -1;
415 }
416
417 // First, check whether specific socket type overloads the option.
418 int rc = xsetsockopt (option_, optval_, optvallen_);
419 if (rc == 0 || errno != EINVAL) {
420 return rc;
421 }
422
423 // If the socket type doesn't support the option, pass it to
424 // the generic option parser.
425 rc = options.setsockopt (option_, optval_, optvallen_);
426 update_pipe_options (option_);
427
428 return rc;
429}
430
431int zmq::socket_base_t::getsockopt (int option_,
432 void *optval_,
433 size_t *optvallen_)
434{
435 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
436
437 if (unlikely (_ctx_terminated)) {
438 errno = ETERM;
439 return -1;
440 }
441
442 if (option_ == ZMQ_RCVMORE) {
443 return do_getsockopt<int> (optval_, optvallen_, _rcvmore ? 1 : 0);
444 }
445
446 if (option_ == ZMQ_FD) {
447 if (_thread_safe) {
448 // thread safe socket doesn't provide file descriptor
449 errno = EINVAL;
450 return -1;
451 }
452
453 return do_getsockopt<fd_t> (
454 optval_, optvallen_,
455 (static_cast<mailbox_t *> (_mailbox))->get_fd ());
456 }
457
458 if (option_ == ZMQ_EVENTS) {
459 const int rc = process_commands (0, false);
460 if (rc != 0 && (errno == EINTR || errno == ETERM)) {
461 return -1;
462 }
463 errno_assert (rc == 0);
464
465 return do_getsockopt<int> (optval_, optvallen_,
466 (has_out () ? ZMQ_POLLOUT : 0)
467 | (has_in () ? ZMQ_POLLIN : 0));
468 }
469
470 if (option_ == ZMQ_LAST_ENDPOINT) {
471 return do_getsockopt (optval_, optvallen_, _last_endpoint);
472 }
473
474 if (option_ == ZMQ_THREAD_SAFE) {
475 return do_getsockopt<int> (optval_, optvallen_, _thread_safe ? 1 : 0);
476 }
477
478 return options.getsockopt (option_, optval_, optvallen_);
479}
480
481int zmq::socket_base_t::join (const char *group_)
482{
483 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
484
485 return xjoin (group_);
486}
487
488int zmq::socket_base_t::leave (const char *group_)
489{
490 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
491
492 return xleave (group_);
493}
494
495void zmq::socket_base_t::add_signaler (signaler_t *s_)
496{
497 zmq_assert (_thread_safe);
498
499 scoped_lock_t sync_lock (_sync);
500 (static_cast<mailbox_safe_t *> (_mailbox))->add_signaler (s_);
501}
502
503void zmq::socket_base_t::remove_signaler (signaler_t *s_)
504{
505 zmq_assert (_thread_safe);
506
507 scoped_lock_t sync_lock (_sync);
508 (static_cast<mailbox_safe_t *> (_mailbox))->remove_signaler (s_);
509}
510
511int zmq::socket_base_t::bind (const char *endpoint_uri_)
512{
513 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
514
515 if (unlikely (_ctx_terminated)) {
516 errno = ETERM;
517 return -1;
518 }
519
520 // Process pending commands, if any.
521 int rc = process_commands (0, false);
522 if (unlikely (rc != 0)) {
523 return -1;
524 }
525
526 // Parse endpoint_uri_ string.
527 std::string protocol;
528 std::string address;
529 if (parse_uri (endpoint_uri_, protocol, address)
530 || check_protocol (protocol)) {
531 return -1;
532 }
533
534 if (protocol == protocol_name::inproc) {
535 const endpoint_t endpoint = {this, options};
536 rc = register_endpoint (endpoint_uri_, endpoint);
537 if (rc == 0) {
538 connect_pending (endpoint_uri_, this);
539 _last_endpoint.assign (endpoint_uri_);
540 options.connected = true;
541 }
542 return rc;
543 }
544
545 if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
546 // For convenience's sake, bind can be used interchangeable with
547 // connect for PGM, EPGM, NORM transports.
548 rc = connect (endpoint_uri_);
549 if (rc != -1)
550 options.connected = true;
551 return rc;
552 }
553
554 if (protocol == protocol_name::udp) {
555 if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
556 errno = ENOCOMPATPROTO;
557 return -1;
558 }
559
560 // Choose the I/O thread to run the session in.
561 io_thread_t *io_thread = choose_io_thread (options.affinity);
562 if (!io_thread) {
563 errno = EMTHREAD;
564 return -1;
565 }
566
567 address_t *paddr =
568 new (std::nothrow) address_t (protocol, address, this->get_ctx ());
569 alloc_assert (paddr);
570
571 paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
572 alloc_assert (paddr->resolved.udp_addr);
573 rc = paddr->resolved.udp_addr->resolve (address.c_str (), true,
574 options.ipv6);
575 if (rc != 0) {
576 LIBZMQ_DELETE (paddr);
577 return -1;
578 }
579
580 session_base_t *session =
581 session_base_t::create (io_thread, true, this, options, paddr);
582 errno_assert (session);
583
584 // Create a bi-directional pipe.
585 object_t *parents[2] = {this, session};
586 pipe_t *new_pipes[2] = {NULL, NULL};
587
588 int hwms[2] = {options.sndhwm, options.rcvhwm};
589 bool conflates[2] = {false, false};
590 rc = pipepair (parents, new_pipes, hwms, conflates);
591 errno_assert (rc == 0);
592
593 // Attach local end of the pipe to the socket object.
594 attach_pipe (new_pipes[0], true, true);
595 pipe_t *const newpipe = new_pipes[0];
596
597 // Attach remote end of the pipe to the session object later on.
598 session->attach_pipe (new_pipes[1]);
599
600 // Save last endpoint URI
601 paddr->to_string (_last_endpoint);
602
603 // TODO shouldn't this use _last_endpoint instead of endpoint_uri_? as in the other cases
604 add_endpoint (endpoint_uri_pair_t (endpoint_uri_, std::string (),
605 endpoint_type_none),
606 static_cast<own_t *> (session), newpipe);
607
608 return 0;
609 }
610
611 // Remaining transports require to be run in an I/O thread, so at this
612 // point we'll choose one.
613 io_thread_t *io_thread = choose_io_thread (options.affinity);
614 if (!io_thread) {
615 errno = EMTHREAD;
616 return -1;
617 }
618
619 if (protocol == protocol_name::tcp) {
620 tcp_listener_t *listener =
621 new (std::nothrow) tcp_listener_t (io_thread, this, options);
622 alloc_assert (listener);
623 rc = listener->set_local_address (address.c_str ());
624 if (rc != 0) {
625 LIBZMQ_DELETE (listener);
626 event_bind_failed (make_unconnected_bind_endpoint_pair (address),
627 zmq_errno ());
628 return -1;
629 }
630
631 // Save last endpoint URI
632 listener->get_local_address (_last_endpoint);
633
634 add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
635 static_cast<own_t *> (listener), NULL);
636 options.connected = true;
637 return 0;
638 }
639
640#ifdef ZMQ_HAVE_WS
641#ifdef ZMQ_HAVE_WSS
642 if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
643 ws_listener_t *listener = new (std::nothrow) ws_listener_t (
644 io_thread, this, options, protocol == protocol_name::wss);
645#else
646 if (protocol == protocol_name::ws) {
647 ws_listener_t *listener =
648 new (std::nothrow) ws_listener_t (io_thread, this, options, false);
649#endif
650 alloc_assert (listener);
651 rc = listener->set_local_address (address.c_str ());
652 if (rc != 0) {
653 LIBZMQ_DELETE (listener);
654 event_bind_failed (make_unconnected_bind_endpoint_pair (address),
655 zmq_errno ());
656 return -1;
657 }
658
659 // Save last endpoint URI
660 listener->get_local_address (_last_endpoint);
661
662 add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
663 static_cast<own_t *> (listener), NULL);
664 options.connected = true;
665 return 0;
666 }
667#endif
668
669#if defined ZMQ_HAVE_IPC
670 if (protocol == protocol_name::ipc) {
671 ipc_listener_t *listener =
672 new (std::nothrow) ipc_listener_t (io_thread, this, options);
673 alloc_assert (listener);
674 int rc = listener->set_local_address (address.c_str ());
675 if (rc != 0) {
676 LIBZMQ_DELETE (listener);
677 event_bind_failed (make_unconnected_bind_endpoint_pair (address),
678 zmq_errno ());
679 return -1;
680 }
681
682 // Save last endpoint URI
683 listener->get_local_address (_last_endpoint);
684
685 add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
686 static_cast<own_t *> (listener), NULL);
687 options.connected = true;
688 return 0;
689 }
690#endif
691#if defined ZMQ_HAVE_TIPC
692 if (protocol == protocol_name::tipc) {
693 tipc_listener_t *listener =
694 new (std::nothrow) tipc_listener_t (io_thread, this, options);
695 alloc_assert (listener);
696 int rc = listener->set_local_address (address.c_str ());
697 if (rc != 0) {
698 LIBZMQ_DELETE (listener);
699 event_bind_failed (make_unconnected_bind_endpoint_pair (address),
700 zmq_errno ());
701 return -1;
702 }
703
704 // Save last endpoint URI
705 listener->get_local_address (_last_endpoint);
706
707 // TODO shouldn't this use _last_endpoint as in the other cases?
708 add_endpoint (make_unconnected_bind_endpoint_pair (endpoint_uri_),
709 static_cast<own_t *> (listener), NULL);
710 options.connected = true;
711 return 0;
712 }
713#endif
714#if defined ZMQ_HAVE_VMCI
715 if (protocol == protocol_name::vmci) {
716 vmci_listener_t *listener =
717 new (std::nothrow) vmci_listener_t (io_thread, this, options);
718 alloc_assert (listener);
719 int rc = listener->set_local_address (address.c_str ());
720 if (rc != 0) {
721 LIBZMQ_DELETE (listener);
722 event_bind_failed (make_unconnected_bind_endpoint_pair (address),
723 zmq_errno ());
724 return -1;
725 }
726
727 listener->get_local_address (_last_endpoint);
728
729 add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint),
730 static_cast<own_t *> (listener), NULL);
731 options.connected = true;
732 return 0;
733 }
734#endif
735
736 zmq_assert (false);
737 return -1;
738}
739
740int zmq::socket_base_t::connect (const char *endpoint_uri_)
741{
742 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
743
744 if (unlikely (_ctx_terminated)) {
745 errno = ETERM;
746 return -1;
747 }
748
749 // Process pending commands, if any.
750 int rc = process_commands (0, false);
751 if (unlikely (rc != 0)) {
752 return -1;
753 }
754
755 // Parse endpoint_uri_ string.
756 std::string protocol;
757 std::string address;
758 if (parse_uri (endpoint_uri_, protocol, address)
759 || check_protocol (protocol)) {
760 return -1;
761 }
762
763 if (protocol == protocol_name::inproc) {
764 // TODO: inproc connect is specific with respect to creating pipes
765 // as there's no 'reconnect' functionality implemented. Once that
766 // is in place we should follow generic pipe creation algorithm.
767
768 // Find the peer endpoint.
769 const endpoint_t peer = find_endpoint (endpoint_uri_);
770
771 // The total HWM for an inproc connection should be the sum of
772 // the binder's HWM and the connector's HWM.
773 const int sndhwm = peer.socket == NULL
774 ? options.sndhwm
775 : options.sndhwm != 0 && peer.options.rcvhwm != 0
776 ? options.sndhwm + peer.options.rcvhwm
777 : 0;
778 const int rcvhwm = peer.socket == NULL
779 ? options.rcvhwm
780 : options.rcvhwm != 0 && peer.options.sndhwm != 0
781 ? options.rcvhwm + peer.options.sndhwm
782 : 0;
783
784 // Create a bi-directional pipe to connect the peers.
785 object_t *parents[2] = {this, peer.socket == NULL ? this : peer.socket};
786 pipe_t *new_pipes[2] = {NULL, NULL};
787
788 const bool conflate = get_effective_conflate_option (options);
789
790 int hwms[2] = {conflate ? -1 : sndhwm, conflate ? -1 : rcvhwm};
791 bool conflates[2] = {conflate, conflate};
792 rc = pipepair (parents, new_pipes, hwms, conflates);
793 if (!conflate) {
794 new_pipes[0]->set_hwms_boost (peer.options.sndhwm,
795 peer.options.rcvhwm);
796 new_pipes[1]->set_hwms_boost (options.sndhwm, options.rcvhwm);
797 }
798
799 errno_assert (rc == 0);
800
801 if (!peer.socket) {
802 // The peer doesn't exist yet so we don't know whether
803 // to send the routing id message or not. To resolve this,
804 // we always send our routing id and drop it later if
805 // the peer doesn't expect it.
806 send_routing_id (new_pipes[0], options);
807
808 const endpoint_t endpoint = {this, options};
809 pend_connection (std::string (endpoint_uri_), endpoint, new_pipes);
810 } else {
811 // If required, send the routing id of the local socket to the peer.
812 if (peer.options.recv_routing_id) {
813 send_routing_id (new_pipes[0], options);
814 }
815
816 // If required, send the routing id of the peer to the local socket.
817 if (options.recv_routing_id) {
818 send_routing_id (new_pipes[1], peer.options);
819 }
820
821 // Attach remote end of the pipe to the peer socket. Note that peer's
822 // seqnum was incremented in find_endpoint function. We don't need it
823 // increased here.
824 send_bind (peer.socket, new_pipes[1], false);
825 }
826
827 // Attach local end of the pipe to this socket object.
828 attach_pipe (new_pipes[0], false, true);
829
830 // Save last endpoint URI
831 _last_endpoint.assign (endpoint_uri_);
832
833 // remember inproc connections for disconnect
834 _inprocs.emplace (endpoint_uri_, new_pipes[0]);
835
836 options.connected = true;
837 return 0;
838 }
839 const bool is_single_connect =
840 (options.type == ZMQ_DEALER || options.type == ZMQ_SUB
841 || options.type == ZMQ_PUB || options.type == ZMQ_REQ);
842 if (unlikely (is_single_connect)) {
843 if (0 != _endpoints.count (endpoint_uri_)) {
844 // There is no valid use for multiple connects for SUB-PUB nor
845 // DEALER-ROUTER nor REQ-REP. Multiple connects produces
846 // nonsensical results.
847 return 0;
848 }
849 }
850
851 // Choose the I/O thread to run the session in.
852 io_thread_t *io_thread = choose_io_thread (options.affinity);
853 if (!io_thread) {
854 errno = EMTHREAD;
855 return -1;
856 }
857
858 address_t *paddr =
859 new (std::nothrow) address_t (protocol, address, this->get_ctx ());
860 alloc_assert (paddr);
861
862 // Resolve address (if needed by the protocol)
863 if (protocol == protocol_name::tcp) {
864 // Do some basic sanity checks on tcp:// address syntax
865 // - hostname starts with digit or letter, with embedded '-' or '.'
866 // - IPv6 address may contain hex chars and colons.
867 // - IPv6 link local address may contain % followed by interface name / zone_id
868 // (Reference: https://tools.ietf.org/html/rfc4007)
869 // - IPv4 address may contain decimal digits and dots.
870 // - Address must end in ":port" where port is *, or numeric
871 // - Address may contain two parts separated by ':'
872 // Following code is quick and dirty check to catch obvious errors,
873 // without trying to be fully accurate.
874 const char *check = address.c_str ();
875 if (isalnum (*check) || isxdigit (*check) || *check == '['
876 || *check == ':') {
877 check++;
878 while (isalnum (*check) || isxdigit (*check) || *check == '.'
879 || *check == '-' || *check == ':' || *check == '%'
880 || *check == ';' || *check == '[' || *check == ']'
881 || *check == '_' || *check == '*') {
882 check++;
883 }
884 }
885 // Assume the worst, now look for success
886 rc = -1;
887 // Did we reach the end of the address safely?
888 if (*check == 0) {
889 // Do we have a valid port string? (cannot be '*' in connect
890 check = strrchr (address.c_str (), ':');
891 if (check) {
892 check++;
893 if (*check && (isdigit (*check)))
894 rc = 0; // Valid
895 }
896 }
897 if (rc == -1) {
898 errno = EINVAL;
899 LIBZMQ_DELETE (paddr);
900 return -1;
901 }
902 // Defer resolution until a socket is opened
903 paddr->resolved.tcp_addr = NULL;
904 }
905#ifdef ZMQ_HAVE_WS
906#ifdef ZMQ_HAVE_WSS
907 else if (protocol == protocol_name::ws || protocol == protocol_name::wss) {
908 if (protocol == protocol_name::wss) {
909 paddr->resolved.wss_addr = new (std::nothrow) wss_address_t ();
910 alloc_assert (paddr->resolved.wss_addr);
911 rc = paddr->resolved.wss_addr->resolve (address.c_str (), false,
912 options.ipv6);
913 } else
914#else
915 else if (protocol == protocol_name::ws) {
916#endif
917 {
918 paddr->resolved.ws_addr = new (std::nothrow) ws_address_t ();
919 alloc_assert (paddr->resolved.ws_addr);
920 rc = paddr->resolved.ws_addr->resolve (address.c_str (), false,
921 options.ipv6);
922 }
923
924 if (rc != 0) {
925 LIBZMQ_DELETE (paddr);
926 return -1;
927 }
928 }
929#endif
930
931#if defined ZMQ_HAVE_IPC
932 else if (protocol == protocol_name::ipc) {
933 paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
934 alloc_assert (paddr->resolved.ipc_addr);
935 int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
936 if (rc != 0) {
937 LIBZMQ_DELETE (paddr);
938 return -1;
939 }
940 }
941#endif
942
943 if (protocol == protocol_name::udp) {
944 if (options.type != ZMQ_RADIO) {
945 errno = ENOCOMPATPROTO;
946 LIBZMQ_DELETE (paddr);
947 return -1;
948 }
949
950 paddr->resolved.udp_addr = new (std::nothrow) udp_address_t ();
951 alloc_assert (paddr->resolved.udp_addr);
952 rc = paddr->resolved.udp_addr->resolve (address.c_str (), false,
953 options.ipv6);
954 if (rc != 0) {
955 LIBZMQ_DELETE (paddr);
956 return -1;
957 }
958 }
959
960 // TBD - Should we check address for ZMQ_HAVE_NORM???
961
962#ifdef ZMQ_HAVE_OPENPGM
963 if (protocol == "pgm" || protocol == "epgm") {
964 struct pgm_addrinfo_t *res = NULL;
965 uint16_t port_number = 0;
966 int rc =
967 pgm_socket_t::init_address (address.c_str (), &res, &port_number);
968 if (res != NULL)
969 pgm_freeaddrinfo (res);
970 if (rc != 0 || port_number == 0) {
971 return -1;
972 }
973 }
974#endif
975#if defined ZMQ_HAVE_TIPC
976 else if (protocol == protocol_name::tipc) {
977 paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
978 alloc_assert (paddr->resolved.tipc_addr);
979 int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
980 if (rc != 0) {
981 LIBZMQ_DELETE (paddr);
982 return -1;
983 }
984 sockaddr_tipc *saddr =
985 (sockaddr_tipc *) paddr->resolved.tipc_addr->addr ();
986 // Cannot connect to random Port Identity
987 if (saddr->addrtype == TIPC_ADDR_ID
988 && paddr->resolved.tipc_addr->is_random ()) {
989 LIBZMQ_DELETE (paddr);
990 errno = EINVAL;
991 return -1;
992 }
993 }
994#endif
995#if defined ZMQ_HAVE_VMCI
996 else if (protocol == protocol_name::vmci) {
997 paddr->resolved.vmci_addr =
998 new (std::nothrow) vmci_address_t (this->get_ctx ());
999 alloc_assert (paddr->resolved.vmci_addr);
1000 int rc = paddr->resolved.vmci_addr->resolve (address.c_str ());
1001 if (rc != 0) {
1002 LIBZMQ_DELETE (paddr);
1003 return -1;
1004 }
1005 }
1006#endif
1007
1008 // Create session.
1009 session_base_t *session =
1010 session_base_t::create (io_thread, true, this, options, paddr);
1011 errno_assert (session);
1012
1013 // PGM does not support subscription forwarding; ask for all data to be
1014 // sent to this pipe. (same for NORM, currently?)
1015 const bool subscribe_to_all = protocol == "pgm" || protocol == "epgm"
1016 || protocol == "norm"
1017 || protocol == protocol_name::udp;
1018 pipe_t *newpipe = NULL;
1019
1020 if (options.immediate != 1 || subscribe_to_all) {
1021 // Create a bi-directional pipe.
1022 object_t *parents[2] = {this, session};
1023 pipe_t *new_pipes[2] = {NULL, NULL};
1024
1025 const bool conflate = get_effective_conflate_option (options);
1026
1027 int hwms[2] = {conflate ? -1 : options.sndhwm,
1028 conflate ? -1 : options.rcvhwm};
1029 bool conflates[2] = {conflate, conflate};
1030 rc = pipepair (parents, new_pipes, hwms, conflates);
1031 errno_assert (rc == 0);
1032
1033 // Attach local end of the pipe to the socket object.
1034 attach_pipe (new_pipes[0], subscribe_to_all, true);
1035 newpipe = new_pipes[0];
1036
1037 // Attach remote end of the pipe to the session object later on.
1038 session->attach_pipe (new_pipes[1]);
1039 }
1040
1041 // Save last endpoint URI
1042 paddr->to_string (_last_endpoint);
1043
1044 add_endpoint (make_unconnected_connect_endpoint_pair (endpoint_uri_),
1045 static_cast<own_t *> (session), newpipe);
1046 return 0;
1047}
1048
1049std::string
1050zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_pair_,
1051 const char *tcp_address_)
1052{
1053 // The resolved last_endpoint is used as a key in the endpoints map.
1054 // The address passed by the user might not match in the TCP case due to
1055 // IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
1056 // resolve before giving up. Given at this stage we don't know whether a
1057 // socket is connected or bound, try with both.
1058 if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1059 tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
1060 alloc_assert (tcp_addr);
1061 int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6);
1062
1063 if (rc == 0) {
1064 tcp_addr->to_string (endpoint_uri_pair_);
1065 if (_endpoints.find (endpoint_uri_pair_) == _endpoints.end ()) {
1066 rc = tcp_addr->resolve (tcp_address_, true, options.ipv6);
1067 if (rc == 0) {
1068 tcp_addr->to_string (endpoint_uri_pair_);
1069 }
1070 }
1071 }
1072 LIBZMQ_DELETE (tcp_addr);
1073 }
1074 return endpoint_uri_pair_;
1075}
1076
1077void zmq::socket_base_t::add_endpoint (
1078 const endpoint_uri_pair_t &endpoint_pair_, own_t *endpoint_, pipe_t *pipe_)
1079{
1080 // Activate the session. Make it a child of this socket.
1081 launch_child (endpoint_);
1082 _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (endpoint_pair_.identifier (),
1083 endpoint_pipe_t (endpoint_, pipe_));
1084
1085 if (pipe_ != NULL)
1086 pipe_->set_endpoint_pair (endpoint_pair_);
1087}
1088
1089int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_)
1090{
1091 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1092
1093 // Check whether the context hasn't been shut down yet.
1094 if (unlikely (_ctx_terminated)) {
1095 errno = ETERM;
1096 return -1;
1097 }
1098
1099 // Check whether endpoint address passed to the function is valid.
1100 if (unlikely (!endpoint_uri_)) {
1101 errno = EINVAL;
1102 return -1;
1103 }
1104
1105 // Process pending commands, if any, since there could be pending unprocessed process_own()'s
1106 // (from launch_child() for example) we're asked to terminate now.
1107 const int rc = process_commands (0, false);
1108 if (unlikely (rc != 0)) {
1109 return -1;
1110 }
1111
1112 // Parse endpoint_uri_ string.
1113 std::string uri_protocol;
1114 std::string uri_path;
1115 if (parse_uri (endpoint_uri_, uri_protocol, uri_path)
1116 || check_protocol (uri_protocol)) {
1117 return -1;
1118 }
1119
1120 const std::string endpoint_uri_str = std::string (endpoint_uri_);
1121
1122 // Disconnect an inproc socket
1123 if (uri_protocol == protocol_name::inproc) {
1124 return unregister_endpoint (endpoint_uri_str, this) == 0
1125 ? 0
1126 : _inprocs.erase_pipes (endpoint_uri_str);
1127 }
1128
1129 const std::string resolved_endpoint_uri =
1130 uri_protocol == protocol_name::tcp
1131 ? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ())
1132 : endpoint_uri_str;
1133
1134 // Find the endpoints range (if any) corresponding to the endpoint_uri_pair_ string.
1135 const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
1136 _endpoints.equal_range (resolved_endpoint_uri);
1137 if (range.first == range.second) {
1138 errno = ENOENT;
1139 return -1;
1140 }
1141
1142 for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1143 // If we have an associated pipe, terminate it.
1144 if (it->second.second != NULL)
1145 it->second.second->terminate (false);
1146 term_child (it->second.first);
1147 }
1148 _endpoints.erase (range.first, range.second);
1149 return 0;
1150}
1151
1152int zmq::socket_base_t::send (msg_t *msg_, int flags_)
1153{
1154 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1155
1156 // Check whether the context hasn't been shut down yet.
1157 if (unlikely (_ctx_terminated)) {
1158 errno = ETERM;
1159 return -1;
1160 }
1161
1162 // Check whether message passed to the function is valid.
1163 if (unlikely (!msg_ || !msg_->check ())) {
1164 errno = EFAULT;
1165 return -1;
1166 }
1167
1168 // Process pending commands, if any.
1169 int rc = process_commands (0, true);
1170 if (unlikely (rc != 0)) {
1171 return -1;
1172 }
1173
1174 // Clear any user-visible flags that are set on the message.
1175 msg_->reset_flags (msg_t::more);
1176
1177 // At this point we impose the flags on the message.
1178 if (flags_ & ZMQ_SNDMORE)
1179 msg_->set_flags (msg_t::more);
1180
1181 msg_->reset_metadata ();
1182
1183 // Try to send the message using method in each socket class
1184 rc = xsend (msg_);
1185 if (rc == 0) {
1186 return 0;
1187 }
1188 // Special case for ZMQ_PUSH: -2 means pipe is dead while a
1189 // multi-part send is in progress and can't be recovered, so drop
1190 // silently when in blocking mode to keep backward compatibility.
1191 if (unlikely (rc == -2)) {
1192 if (!((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0)) {
1193 rc = msg_->close ();
1194 errno_assert (rc == 0);
1195 rc = msg_->init ();
1196 errno_assert (rc == 0);
1197 return 0;
1198 }
1199 }
1200 if (unlikely (errno != EAGAIN)) {
1201 return -1;
1202 }
1203
1204 // In case of non-blocking send we'll simply propagate
1205 // the error - including EAGAIN - up the stack.
1206 if ((flags_ & ZMQ_DONTWAIT) || options.sndtimeo == 0) {
1207 return -1;
1208 }
1209
1210 // Compute the time when the timeout should occur.
1211 // If the timeout is infinite, don't care.
1212 int timeout = options.sndtimeo;
1213 const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1214
1215 // Oops, we couldn't send the message. Wait for the next
1216 // command, process it and try to send the message again.
1217 // If timeout is reached in the meantime, return EAGAIN.
1218 while (true) {
1219 if (unlikely (process_commands (timeout, false) != 0)) {
1220 return -1;
1221 }
1222 rc = xsend (msg_);
1223 if (rc == 0)
1224 break;
1225 if (unlikely (errno != EAGAIN)) {
1226 return -1;
1227 }
1228 if (timeout > 0) {
1229 timeout = static_cast<int> (end - _clock.now_ms ());
1230 if (timeout <= 0) {
1231 errno = EAGAIN;
1232 return -1;
1233 }
1234 }
1235 }
1236
1237 return 0;
1238}
1239
1240int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
1241{
1242 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1243
1244 // Check whether the context hasn't been shut down yet.
1245 if (unlikely (_ctx_terminated)) {
1246 errno = ETERM;
1247 return -1;
1248 }
1249
1250 // Check whether message passed to the function is valid.
1251 if (unlikely (!msg_ || !msg_->check ())) {
1252 errno = EFAULT;
1253 return -1;
1254 }
1255
1256 // Once every inbound_poll_rate messages check for signals and process
1257 // incoming commands. This happens only if we are not polling altogether
1258 // because there are messages available all the time. If poll occurs,
1259 // ticks is set to zero and thus we avoid this code.
1260 //
1261 // Note that 'recv' uses different command throttling algorithm (the one
1262 // described above) from the one used by 'send'. This is because counting
1263 // ticks is more efficient than doing RDTSC all the time.
1264 if (++_ticks == inbound_poll_rate) {
1265 if (unlikely (process_commands (0, false) != 0)) {
1266 return -1;
1267 }
1268 _ticks = 0;
1269 }
1270
1271 // Get the message.
1272 int rc = xrecv (msg_);
1273 if (unlikely (rc != 0 && errno != EAGAIN)) {
1274 return -1;
1275 }
1276
1277 // If we have the message, return immediately.
1278 if (rc == 0) {
1279 extract_flags (msg_);
1280 return 0;
1281 }
1282
1283 // If the message cannot be fetched immediately, there are two scenarios.
1284 // For non-blocking recv, commands are processed in case there's an
1285 // activate_reader command already waiting in a command pipe.
1286 // If it's not, return EAGAIN.
1287 if ((flags_ & ZMQ_DONTWAIT) || options.rcvtimeo == 0) {
1288 if (unlikely (process_commands (0, false) != 0)) {
1289 return -1;
1290 }
1291 _ticks = 0;
1292
1293 rc = xrecv (msg_);
1294 if (rc < 0) {
1295 return rc;
1296 }
1297 extract_flags (msg_);
1298
1299 return 0;
1300 }
1301
1302 // Compute the time when the timeout should occur.
1303 // If the timeout is infinite, don't care.
1304 int timeout = options.rcvtimeo;
1305 const uint64_t end = timeout < 0 ? 0 : (_clock.now_ms () + timeout);
1306
1307 // In blocking scenario, commands are processed over and over again until
1308 // we are able to fetch a message.
1309 bool block = (_ticks != 0);
1310 while (true) {
1311 if (unlikely (process_commands (block ? timeout : 0, false) != 0)) {
1312 return -1;
1313 }
1314 rc = xrecv (msg_);
1315 if (rc == 0) {
1316 _ticks = 0;
1317 break;
1318 }
1319 if (unlikely (errno != EAGAIN)) {
1320 return -1;
1321 }
1322 block = true;
1323 if (timeout > 0) {
1324 timeout = static_cast<int> (end - _clock.now_ms ());
1325 if (timeout <= 0) {
1326 errno = EAGAIN;
1327 return -1;
1328 }
1329 }
1330 }
1331
1332 extract_flags (msg_);
1333 return 0;
1334}
1335
1336int zmq::socket_base_t::close ()
1337{
1338 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1339
1340 // Remove all existing signalers for thread safe sockets
1341 if (_thread_safe)
1342 (static_cast<mailbox_safe_t *> (_mailbox))->clear_signalers ();
1343
1344 // Mark the socket as dead
1345 _tag = 0xdeadbeef;
1346
1347
1348 // Transfer the ownership of the socket from this application thread
1349 // to the reaper thread which will take care of the rest of shutdown
1350 // process.
1351 send_reap (this);
1352
1353 return 0;
1354}
1355
1356bool zmq::socket_base_t::has_in ()
1357{
1358 return xhas_in ();
1359}
1360
1361bool zmq::socket_base_t::has_out ()
1362{
1363 return xhas_out ();
1364}
1365
1366void zmq::socket_base_t::start_reaping (poller_t *poller_)
1367{
1368 // Plug the socket to the reaper thread.
1369 _poller = poller_;
1370
1371 fd_t fd;
1372
1373 if (!_thread_safe)
1374 fd = (static_cast<mailbox_t *> (_mailbox))->get_fd ();
1375 else {
1376 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1377
1378 _reaper_signaler = new (std::nothrow) signaler_t ();
1379 zmq_assert (_reaper_signaler);
1380
1381 // Add signaler to the safe mailbox
1382 fd = _reaper_signaler->get_fd ();
1383 (static_cast<mailbox_safe_t *> (_mailbox))
1384 ->add_signaler (_reaper_signaler);
1385
1386 // Send a signal to make sure reaper handle existing commands
1387 _reaper_signaler->send ();
1388 }
1389
1390 _handle = _poller->add_fd (fd, this);
1391 _poller->set_pollin (_handle);
1392
1393 // Initialise the termination and check whether it can be deallocated
1394 // immediately.
1395 terminate ();
1396 check_destroy ();
1397}
1398
1399int zmq::socket_base_t::process_commands (int timeout_, bool throttle_)
1400{
1401 if (timeout_ == 0) {
1402 // If we are asked not to wait, check whether we haven't processed
1403 // commands recently, so that we can throttle the new commands.
1404
1405 // Get the CPU's tick counter. If 0, the counter is not available.
1406 const uint64_t tsc = zmq::clock_t::rdtsc ();
1407
1408 // Optimised version of command processing - it doesn't have to check
1409 // for incoming commands each time. It does so only if certain time
1410 // elapsed since last command processing. Command delay varies
1411 // depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
1412 // etc. The optimisation makes sense only on platforms where getting
1413 // a timestamp is a very cheap operation (tens of nanoseconds).
1414 if (tsc && throttle_) {
1415 // Check whether TSC haven't jumped backwards (in case of migration
1416 // between CPU cores) and whether certain time have elapsed since
1417 // last command processing. If it didn't do nothing.
1418 if (tsc >= _last_tsc && tsc - _last_tsc <= max_command_delay)
1419 return 0;
1420 _last_tsc = tsc;
1421 }
1422 }
1423
1424 // Check whether there are any commands pending for this thread.
1425 command_t cmd;
1426 int rc = _mailbox->recv (&cmd, timeout_);
1427
1428 // Process all available commands.
1429 while (rc == 0) {
1430 cmd.destination->process_command (cmd);
1431 rc = _mailbox->recv (&cmd, 0);
1432 }
1433
1434 if (errno == EINTR)
1435 return -1;
1436
1437 zmq_assert (errno == EAGAIN);
1438
1439 if (_ctx_terminated) {
1440 errno = ETERM;
1441 return -1;
1442 }
1443
1444 return 0;
1445}
1446
1447void zmq::socket_base_t::process_stop ()
1448{
1449 // Here, someone have called zmq_ctx_term while the socket was still alive.
1450 // We'll remember the fact so that any blocking call is interrupted and any
1451 // further attempt to use the socket will return ETERM. The user is still
1452 // responsible for calling zmq_close on the socket though!
1453 scoped_lock_t lock (_monitor_sync);
1454 stop_monitor ();
1455
1456 _ctx_terminated = true;
1457}
1458
1459void zmq::socket_base_t::process_bind (pipe_t *pipe_)
1460{
1461 attach_pipe (pipe_);
1462}
1463
1464void zmq::socket_base_t::process_term (int linger_)
1465{
1466 // Unregister all inproc endpoints associated with this socket.
1467 // Doing this we make sure that no new pipes from other sockets (inproc)
1468 // will be initiated.
1469 unregister_endpoints (this);
1470
1471 // Ask all attached pipes to terminate.
1472 for (pipes_t::size_type i = 0; i != _pipes.size (); ++i)
1473 _pipes[i]->terminate (false);
1474 register_term_acks (static_cast<int> (_pipes.size ()));
1475
1476 // Continue the termination process immediately.
1477 own_t::process_term (linger_);
1478}
1479
1480void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
1481{
1482 term_endpoint (endpoint_->c_str ());
1483 delete endpoint_;
1484}
1485
1486void zmq::socket_base_t::process_pipe_stats_publish (
1487 uint64_t outbound_queue_count_,
1488 uint64_t inbound_queue_count_,
1489 endpoint_uri_pair_t *endpoint_pair_)
1490{
1491 uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_};
1492 event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS);
1493 delete endpoint_pair_;
1494}
1495
1496/*
1497 * There are 2 pipes per connection, and the inbound one _must_ be queried from
1498 * the I/O thread. So ask the outbound pipe, in the application thread, to send
1499 * a message (pipe_peer_stats) to its peer. The message will carry the outbound
1500 * pipe stats and endpoint, and the reference to the socket object.
1501 * The inbound pipe on the I/O thread will then add its own stats and endpoint,
1502 * and write back a message to the socket object (pipe_stats_publish) which
1503 * will raise an event with the data.
1504 */
1505int zmq::socket_base_t::query_pipes_stats ()
1506{
1507 {
1508 scoped_lock_t lock (_monitor_sync);
1509 if (!(_monitor_events & ZMQ_EVENT_PIPES_STATS)) {
1510 errno = EINVAL;
1511 return -1;
1512 }
1513 }
1514 if (_pipes.size () == 0) {
1515 errno = EAGAIN;
1516 return -1;
1517 }
1518 for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) {
1519 _pipes[i]->send_stats_to_peer (this);
1520 }
1521
1522 return 0;
1523}
1524
1525void zmq::socket_base_t::update_pipe_options (int option_)
1526{
1527 if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
1528 for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) {
1529 _pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
1530 _pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
1531 }
1532 }
1533}
1534
1535void zmq::socket_base_t::process_destroy ()
1536{
1537 _destroyed = true;
1538}
1539
1540int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
1541{
1542 errno = EINVAL;
1543 return -1;
1544}
1545
1546bool zmq::socket_base_t::xhas_out ()
1547{
1548 return false;
1549}
1550
1551int zmq::socket_base_t::xsend (msg_t *)
1552{
1553 errno = ENOTSUP;
1554 return -1;
1555}
1556
1557bool zmq::socket_base_t::xhas_in ()
1558{
1559 return false;
1560}
1561
1562int zmq::socket_base_t::xjoin (const char *group_)
1563{
1564 LIBZMQ_UNUSED (group_);
1565 errno = ENOTSUP;
1566 return -1;
1567}
1568
1569int zmq::socket_base_t::xleave (const char *group_)
1570{
1571 LIBZMQ_UNUSED (group_);
1572 errno = ENOTSUP;
1573 return -1;
1574}
1575
1576int zmq::socket_base_t::xrecv (msg_t *)
1577{
1578 errno = ENOTSUP;
1579 return -1;
1580}
1581
1582void zmq::socket_base_t::xread_activated (pipe_t *)
1583{
1584 zmq_assert (false);
1585}
1586void zmq::socket_base_t::xwrite_activated (pipe_t *)
1587{
1588 zmq_assert (false);
1589}
1590
1591void zmq::socket_base_t::xhiccuped (pipe_t *)
1592{
1593 zmq_assert (false);
1594}
1595
1596void zmq::socket_base_t::in_event ()
1597{
1598 // This function is invoked only once the socket is running in the context
1599 // of the reaper thread. Process any commands from other threads/sockets
1600 // that may be available at the moment. Ultimately, the socket will
1601 // be destroyed.
1602 {
1603 scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
1604
1605 // If the socket is thread safe we need to unsignal the reaper signaler
1606 if (_thread_safe)
1607 _reaper_signaler->recv ();
1608
1609 process_commands (0, false);
1610 }
1611 check_destroy ();
1612}
1613
1614void zmq::socket_base_t::out_event ()
1615{
1616 zmq_assert (false);
1617}
1618
1619void zmq::socket_base_t::timer_event (int)
1620{
1621 zmq_assert (false);
1622}
1623
1624void zmq::socket_base_t::check_destroy ()
1625{
1626 // If the object was already marked as destroyed, finish the deallocation.
1627 if (_destroyed) {
1628 // Remove the socket from the reaper's poller.
1629 _poller->rm_fd (_handle);
1630
1631 // Remove the socket from the context.
1632 destroy_socket (this);
1633
1634 // Notify the reaper about the fact.
1635 send_reaped ();
1636
1637 // Deallocate.
1638 own_t::process_destroy ();
1639 }
1640}
1641
1642void zmq::socket_base_t::read_activated (pipe_t *pipe_)
1643{
1644 xread_activated (pipe_);
1645}
1646
1647void zmq::socket_base_t::write_activated (pipe_t *pipe_)
1648{
1649 xwrite_activated (pipe_);
1650}
1651
1652void zmq::socket_base_t::hiccuped (pipe_t *pipe_)
1653{
1654 if (options.immediate == 1)
1655 pipe_->terminate (false);
1656 else
1657 // Notify derived sockets of the hiccup
1658 xhiccuped (pipe_);
1659}
1660
1661void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
1662{
1663 // Notify the specific socket type about the pipe termination.
1664 xpipe_terminated (pipe_);
1665
1666 // Remove pipe from inproc pipes
1667 _inprocs.erase_pipe (pipe_);
1668
1669 // Remove the pipe from the list of attached pipes and confirm its
1670 // termination if we are already shutting down.
1671 _pipes.erase (pipe_);
1672
1673 // Remove the pipe from _endpoints (set it to NULL).
1674 const std::string &identifier = pipe_->get_endpoint_pair ().identifier ();
1675 if (!identifier.empty ()) {
1676 std::pair<endpoints_t::iterator, endpoints_t::iterator> range;
1677 range = _endpoints.equal_range (identifier);
1678
1679 for (endpoints_t::iterator it = range.first; it != range.second; ++it) {
1680 if (it->second.second == pipe_) {
1681 it->second.second = NULL;
1682 break;
1683 }
1684 }
1685 }
1686
1687 if (is_terminating ())
1688 unregister_term_ack ();
1689}
1690
1691void zmq::socket_base_t::extract_flags (msg_t *msg_)
1692{
1693 // Test whether routing_id flag is valid for this socket type.
1694 if (unlikely (msg_->flags () & msg_t::routing_id))
1695 zmq_assert (options.recv_routing_id);
1696
1697 // Remove MORE flag.
1698 _rcvmore = (msg_->flags () & msg_t::more) != 0;
1699}
1700
1701int zmq::socket_base_t::monitor (const char *endpoint_,
1702 uint64_t events_,
1703 int event_version_,
1704 int type_)
1705{
1706 scoped_lock_t lock (_monitor_sync);
1707
1708 if (unlikely (_ctx_terminated)) {
1709 errno = ETERM;
1710 return -1;
1711 }
1712
1713 // Event version 1 supports only first 16 events.
1714 if (unlikely (event_version_ == 1 && events_ >> 16 != 0)) {
1715 errno = EINVAL;
1716 return -1;
1717 }
1718
1719 // Support deregistering monitoring endpoints as well
1720 if (endpoint_ == NULL) {
1721 stop_monitor ();
1722 return 0;
1723 }
1724 // Parse endpoint_uri_ string.
1725 std::string protocol;
1726 std::string address;
1727 if (parse_uri (endpoint_, protocol, address) || check_protocol (protocol))
1728 return -1;
1729
1730 // Event notification only supported over inproc://
1731 if (protocol != protocol_name::inproc) {
1732 errno = EPROTONOSUPPORT;
1733 return -1;
1734 }
1735
1736 // already monitoring. Stop previous monitor before starting new one.
1737 if (_monitor_socket != NULL) {
1738 stop_monitor (true);
1739 }
1740
1741 // Check if the specified socket type is supported. It must be a
1742 // one-way socket types that support the SNDMORE flag.
1743 switch (type_) {
1744 case ZMQ_PAIR:
1745 break;
1746 case ZMQ_PUB:
1747 break;
1748 case ZMQ_PUSH:
1749 break;
1750 default:
1751 errno = EINVAL;
1752 return -1;
1753 }
1754
1755 // Register events to monitor
1756 _monitor_events = events_;
1757 options.monitor_event_version = event_version_;
1758 // Create a monitor socket of the specified type.
1759 _monitor_socket = zmq_socket (get_ctx (), type_);
1760 if (_monitor_socket == NULL)
1761 return -1;
1762
1763 // Never block context termination on pending event messages
1764 int linger = 0;
1765 int rc =
1766 zmq_setsockopt (_monitor_socket, ZMQ_LINGER, &linger, sizeof (linger));
1767 if (rc == -1)
1768 stop_monitor (false);
1769
1770 // Spawn the monitor socket endpoint
1771 rc = zmq_bind (_monitor_socket, endpoint_);
1772 if (rc == -1)
1773 stop_monitor (false);
1774 return rc;
1775}
1776
1777void zmq::socket_base_t::event_connected (
1778 const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1779{
1780 uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1781 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECTED);
1782}
1783
1784void zmq::socket_base_t::event_connect_delayed (
1785 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1786{
1787 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1788 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_DELAYED);
1789}
1790
1791void zmq::socket_base_t::event_connect_retried (
1792 const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_)
1793{
1794 uint64_t values[1] = {static_cast<uint64_t> (interval_)};
1795 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_RETRIED);
1796}
1797
1798void zmq::socket_base_t::event_listening (
1799 const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1800{
1801 uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1802 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_LISTENING);
1803}
1804
1805void zmq::socket_base_t::event_bind_failed (
1806 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1807{
1808 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1809 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_BIND_FAILED);
1810}
1811
1812void zmq::socket_base_t::event_accepted (
1813 const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1814{
1815 uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1816 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPTED);
1817}
1818
1819void zmq::socket_base_t::event_accept_failed (
1820 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1821{
1822 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1823 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPT_FAILED);
1824}
1825
1826void zmq::socket_base_t::event_closed (
1827 const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1828{
1829 uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1830 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSED);
1831}
1832
1833void zmq::socket_base_t::event_close_failed (
1834 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1835{
1836 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1837 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSE_FAILED);
1838}
1839
1840void zmq::socket_base_t::event_disconnected (
1841 const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
1842{
1843 uint64_t values[1] = {static_cast<uint64_t> (fd_)};
1844 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_DISCONNECTED);
1845}
1846
1847void zmq::socket_base_t::event_handshake_failed_no_detail (
1848 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1849{
1850 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1851 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
1852}
1853
1854void zmq::socket_base_t::event_handshake_failed_protocol (
1855 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1856{
1857 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1858 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
1859}
1860
1861void zmq::socket_base_t::event_handshake_failed_auth (
1862 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1863{
1864 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1865 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
1866}
1867
1868void zmq::socket_base_t::event_handshake_succeeded (
1869 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
1870{
1871 uint64_t values[1] = {static_cast<uint64_t> (err_)};
1872 event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
1873}
1874
1875void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_,
1876 uint64_t values_[],
1877 uint64_t values_count_,
1878 uint64_t type_)
1879{
1880 scoped_lock_t lock (_monitor_sync);
1881 if (_monitor_events & type_) {
1882 monitor_event (type_, values_, values_count_, endpoint_uri_pair_);
1883 }
1884}
1885
1886// Send a monitor event
1887void zmq::socket_base_t::monitor_event (
1888 uint64_t event_,
1889 uint64_t values_[],
1890 uint64_t values_count_,
1891 const endpoint_uri_pair_t &endpoint_uri_pair_) const
1892{
1893 // this is a private method which is only called from
1894 // contexts where the _monitor_sync mutex has been locked before
1895
1896 if (_monitor_socket) {
1897 zmq_msg_t msg;
1898
1899 switch (options.monitor_event_version) {
1900 case 1: {
1901 // The API should not allow to activate unsupported events
1902 zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ());
1903 // v1 only allows one value
1904 zmq_assert (values_count_ == 1);
1905 zmq_assert (values_[0]
1906 <= std::numeric_limits<uint32_t>::max ());
1907
1908 // Send event and value in first frame
1909 const uint16_t event = static_cast<uint16_t> (event_);
1910 const uint32_t value = static_cast<uint32_t> (values_[0]);
1911 zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
1912 uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
1913 // Avoid dereferencing uint32_t on unaligned address
1914 memcpy (data + 0, &event, sizeof (event));
1915 memcpy (data + sizeof (event), &value, sizeof (value));
1916 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1917
1918 const std::string &endpoint_uri =
1919 endpoint_uri_pair_.identifier ();
1920
1921 // Send address in second frame
1922 zmq_msg_init_size (&msg, endpoint_uri.size ());
1923 memcpy (zmq_msg_data (&msg), endpoint_uri.c_str (),
1924 endpoint_uri.size ());
1925 zmq_msg_send (&msg, _monitor_socket, 0);
1926 } break;
1927 case 2: {
1928 // Send event in first frame (64bit unsigned)
1929 zmq_msg_init_size (&msg, sizeof (event_));
1930 memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
1931 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1932
1933 // Send number of values that will follow in second frame
1934 zmq_msg_init_size (&msg, sizeof (values_count_));
1935 memcpy (zmq_msg_data (&msg), &values_count_,
1936 sizeof (values_count_));
1937 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1938
1939 // Send values in third-Nth frames (64bit unsigned)
1940 for (uint64_t i = 0; i < values_count_; ++i) {
1941 zmq_msg_init_size (&msg, sizeof (values_[i]));
1942 memcpy (zmq_msg_data (&msg), &values_[i],
1943 sizeof (values_[i]));
1944 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1945 }
1946
1947 // Send local endpoint URI in second-to-last frame (string)
1948 zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ());
1949 memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (),
1950 endpoint_uri_pair_.local.size ());
1951 zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
1952
1953 // Send remote endpoint URI in last frame (string)
1954 zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ());
1955 memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (),
1956 endpoint_uri_pair_.remote.size ());
1957 zmq_msg_send (&msg, _monitor_socket, 0);
1958 } break;
1959 }
1960 }
1961}
1962
1963void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
1964{
1965 // this is a private method which is only called from
1966 // contexts where the _monitor_sync mutex has been locked before
1967
1968 if (_monitor_socket) {
1969 if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
1970 && send_monitor_stopped_event_) {
1971 uint64_t values[1] = {0};
1972 monitor_event (ZMQ_EVENT_MONITOR_STOPPED, values, 1,
1973 endpoint_uri_pair_t ());
1974 }
1975 zmq_close (_monitor_socket);
1976 _monitor_socket = NULL;
1977 _monitor_events = 0;
1978 }
1979}
1980
1981zmq::routing_socket_base_t::routing_socket_base_t (class ctx_t *parent_,
1982 uint32_t tid_,
1983 int sid_) :
1984 socket_base_t (parent_, tid_, sid_)
1985{
1986}
1987
1988zmq::routing_socket_base_t::~routing_socket_base_t ()
1989{
1990 zmq_assert (_out_pipes.empty ());
1991}
1992
1993int zmq::routing_socket_base_t::xsetsockopt (int option_,
1994 const void *optval_,
1995 size_t optvallen_)
1996{
1997 switch (option_) {
1998 case ZMQ_CONNECT_ROUTING_ID:
1999 // TODO why isn't it possible to set an empty connect_routing_id
2000 // (which is the default value)
2001 if (optval_ && optvallen_) {
2002 _connect_routing_id.assign (static_cast<const char *> (optval_),
2003 optvallen_);
2004 return 0;
2005 }
2006 break;
2007 }
2008 errno = EINVAL;
2009 return -1;
2010}
2011
2012void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
2013{
2014 const out_pipes_t::iterator end = _out_pipes.end ();
2015 out_pipes_t::iterator it;
2016 for (it = _out_pipes.begin (); it != end; ++it)
2017 if (it->second.pipe == pipe_)
2018 break;
2019
2020 zmq_assert (it != end);
2021 zmq_assert (!it->second.active);
2022 it->second.active = true;
2023}
2024
2025std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
2026{
2027 std::string res = ZMQ_MOVE (_connect_routing_id);
2028 _connect_routing_id.clear ();
2029 return res;
2030}
2031
2032bool zmq::routing_socket_base_t::connect_routing_id_is_set () const
2033{
2034 return !_connect_routing_id.empty ();
2035}
2036
2037void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_,
2038 pipe_t *pipe_)
2039{
2040 // Add the record into output pipes lookup table
2041 const out_pipe_t outpipe = {pipe_, true};
2042 const bool ok =
2043 _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id_), outpipe)
2044 .second;
2045 zmq_assert (ok);
2046}
2047
2048bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id_) const
2049{
2050 return 0 != _out_pipes.count (routing_id_);
2051}
2052
2053zmq::routing_socket_base_t::out_pipe_t *
2054zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_)
2055{
2056 // TODO we could probably avoid constructor a temporary blob_t to call this function
2057 out_pipes_t::iterator it = _out_pipes.find (routing_id_);
2058 return it == _out_pipes.end () ? NULL : &it->second;
2059}
2060
2061const zmq::routing_socket_base_t::out_pipe_t *
2062zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_) const
2063{
2064 // TODO we could probably avoid constructor a temporary blob_t to call this function
2065 const out_pipes_t::const_iterator it = _out_pipes.find (routing_id_);
2066 return it == _out_pipes.end () ? NULL : &it->second;
2067}
2068
2069void zmq::routing_socket_base_t::erase_out_pipe (pipe_t *pipe_)
2070{
2071 const size_t erased = _out_pipes.erase (pipe_->get_routing_id ());
2072 zmq_assert (erased);
2073}
2074
2075zmq::routing_socket_base_t::out_pipe_t
2076zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id_)
2077{
2078 const out_pipes_t::iterator it = _out_pipes.find (routing_id_);
2079 out_pipe_t res = {NULL, false};
2080 if (it != _out_pipes.end ()) {
2081 res = it->second;
2082 _out_pipes.erase (it);
2083 }
2084 return res;
2085}
2086