1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include "macros.hpp"
32#include "session_base.hpp"
33#include "i_engine.hpp"
34#include "err.hpp"
35#include "pipe.hpp"
36#include "likely.hpp"
37#include "tcp_connecter.hpp"
38#include "ws_connecter.hpp"
39#include "ipc_connecter.hpp"
40#include "tipc_connecter.hpp"
41#include "socks_connecter.hpp"
42#include "vmci_connecter.hpp"
43#include "pgm_sender.hpp"
44#include "pgm_receiver.hpp"
45#include "address.hpp"
46#include "norm_engine.hpp"
47#include "udp_engine.hpp"
48
49#include "ctx.hpp"
50#include "req.hpp"
51#include "radio.hpp"
52#include "dish.hpp"
53
54zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
55 bool active_,
56 class socket_base_t *socket_,
57 const options_t &options_,
58 address_t *addr_)
59{
60 session_base_t *s = NULL;
61 switch (options_.type) {
62 case ZMQ_REQ:
63 s = new (std::nothrow)
64 req_session_t (io_thread_, active_, socket_, options_, addr_);
65 break;
66 case ZMQ_RADIO:
67 s = new (std::nothrow)
68 radio_session_t (io_thread_, active_, socket_, options_, addr_);
69 break;
70 case ZMQ_DISH:
71 s = new (std::nothrow)
72 dish_session_t (io_thread_, active_, socket_, options_, addr_);
73 break;
74 case ZMQ_DEALER:
75 case ZMQ_REP:
76 case ZMQ_ROUTER:
77 case ZMQ_PUB:
78 case ZMQ_XPUB:
79 case ZMQ_SUB:
80 case ZMQ_XSUB:
81 case ZMQ_PUSH:
82 case ZMQ_PULL:
83 case ZMQ_PAIR:
84 case ZMQ_STREAM:
85 case ZMQ_SERVER:
86 case ZMQ_CLIENT:
87 case ZMQ_GATHER:
88 case ZMQ_SCATTER:
89 case ZMQ_DGRAM:
90 s = new (std::nothrow)
91 session_base_t (io_thread_, active_, socket_, options_, addr_);
92 break;
93 default:
94 errno = EINVAL;
95 return NULL;
96 }
97 alloc_assert (s);
98 return s;
99}
100
101zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
102 bool active_,
103 class socket_base_t *socket_,
104 const options_t &options_,
105 address_t *addr_) :
106 own_t (io_thread_, options_),
107 io_object_t (io_thread_),
108 _active (active_),
109 _pipe (NULL),
110 _zap_pipe (NULL),
111 _incomplete_in (false),
112 _pending (false),
113 _engine (NULL),
114 _socket (socket_),
115 _io_thread (io_thread_),
116 _has_linger_timer (false),
117 _addr (addr_),
118 _wss_hostname (NULL)
119{
120 if (options_.wss_hostname.length () > 0) {
121 _wss_hostname =
122 static_cast<char *> (malloc (options_.wss_hostname.length () + 1));
123 assert (_wss_hostname);
124 strcpy (_wss_hostname, options_.wss_hostname.c_str ());
125 }
126}
127
128const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const
129{
130 return _engine->get_endpoint ();
131}
132
133zmq::session_base_t::~session_base_t ()
134{
135 zmq_assert (!_pipe);
136 zmq_assert (!_zap_pipe);
137
138 // If there's still a pending linger timer, remove it.
139 if (_has_linger_timer) {
140 cancel_timer (linger_timer_id);
141 _has_linger_timer = false;
142 }
143
144 // Close the engine.
145 if (_engine)
146 _engine->terminate ();
147
148 if (_wss_hostname)
149 free (_wss_hostname);
150
151 LIBZMQ_DELETE (_addr);
152}
153
154void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
155{
156 zmq_assert (!is_terminating ());
157 zmq_assert (!_pipe);
158 zmq_assert (pipe_);
159 _pipe = pipe_;
160 _pipe->set_event_sink (this);
161}
162
163int zmq::session_base_t::pull_msg (msg_t *msg_)
164{
165 if (!_pipe || !_pipe->read (msg_)) {
166 errno = EAGAIN;
167 return -1;
168 }
169
170 _incomplete_in = (msg_->flags () & msg_t::more) != 0;
171
172 return 0;
173}
174
175int zmq::session_base_t::push_msg (msg_t *msg_)
176{
177 // pass subscribe/cancel to the sockets
178 if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe ()
179 && !msg_->is_cancel ())
180 return 0;
181 if (_pipe && _pipe->write (msg_)) {
182 int rc = msg_->init ();
183 errno_assert (rc == 0);
184 return 0;
185 }
186
187 errno = EAGAIN;
188 return -1;
189}
190
191int zmq::session_base_t::read_zap_msg (msg_t *msg_)
192{
193 if (_zap_pipe == NULL) {
194 errno = ENOTCONN;
195 return -1;
196 }
197
198 if (!_zap_pipe->read (msg_)) {
199 errno = EAGAIN;
200 return -1;
201 }
202
203 return 0;
204}
205
206int zmq::session_base_t::write_zap_msg (msg_t *msg_)
207{
208 if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) {
209 errno = ENOTCONN;
210 return -1;
211 }
212
213 if ((msg_->flags () & msg_t::more) == 0)
214 _zap_pipe->flush ();
215
216 const int rc = msg_->init ();
217 errno_assert (rc == 0);
218 return 0;
219}
220
221void zmq::session_base_t::reset ()
222{
223}
224
225void zmq::session_base_t::flush ()
226{
227 if (_pipe)
228 _pipe->flush ();
229}
230
231void zmq::session_base_t::rollback ()
232{
233 if (_pipe)
234 _pipe->rollback ();
235}
236
237void zmq::session_base_t::clean_pipes ()
238{
239 zmq_assert (_pipe != NULL);
240
241 // Get rid of half-processed messages in the out pipe. Flush any
242 // unflushed messages upstream.
243 _pipe->rollback ();
244 _pipe->flush ();
245
246 // Remove any half-read message from the in pipe.
247 while (_incomplete_in) {
248 msg_t msg;
249 int rc = msg.init ();
250 errno_assert (rc == 0);
251 rc = pull_msg (&msg);
252 errno_assert (rc == 0);
253 rc = msg.close ();
254 errno_assert (rc == 0);
255 }
256}
257
258void zmq::session_base_t::pipe_terminated (pipe_t *pipe_)
259{
260 // Drop the reference to the deallocated pipe if required.
261 zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe
262 || _terminating_pipes.count (pipe_) == 1);
263
264 if (pipe_ == _pipe) {
265 // If this is our current pipe, remove it
266 _pipe = NULL;
267 if (_has_linger_timer) {
268 cancel_timer (linger_timer_id);
269 _has_linger_timer = false;
270 }
271 } else if (pipe_ == _zap_pipe)
272 _zap_pipe = NULL;
273 else
274 // Remove the pipe from the detached pipes set
275 _terminating_pipes.erase (pipe_);
276
277 if (!is_terminating () && options.raw_socket) {
278 if (_engine) {
279 _engine->terminate ();
280 _engine = NULL;
281 }
282 terminate ();
283 }
284
285 // If we are waiting for pending messages to be sent, at this point
286 // we are sure that there will be no more messages and we can proceed
287 // with termination safely.
288 if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
289 _pending = false;
290 own_t::process_term (0);
291 }
292}
293
294void zmq::session_base_t::read_activated (pipe_t *pipe_)
295{
296 // Skip activating if we're detaching this pipe
297 if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) {
298 zmq_assert (_terminating_pipes.count (pipe_) == 1);
299 return;
300 }
301
302 if (unlikely (_engine == NULL)) {
303 _pipe->check_read ();
304 return;
305 }
306
307 if (likely (pipe_ == _pipe))
308 _engine->restart_output ();
309 else {
310 // i.e. pipe_ == zap_pipe
311 _engine->zap_msg_available ();
312 }
313}
314
315void zmq::session_base_t::write_activated (pipe_t *pipe_)
316{
317 // Skip activating if we're detaching this pipe
318 if (_pipe != pipe_) {
319 zmq_assert (_terminating_pipes.count (pipe_) == 1);
320 return;
321 }
322
323 if (_engine)
324 _engine->restart_input ();
325}
326
327void zmq::session_base_t::hiccuped (pipe_t *)
328{
329 // Hiccups are always sent from session to socket, not the other
330 // way round.
331 zmq_assert (false);
332}
333
334zmq::socket_base_t *zmq::session_base_t::get_socket ()
335{
336 return _socket;
337}
338
339void zmq::session_base_t::process_plug ()
340{
341 if (_active)
342 start_connecting (false);
343}
344
345// This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP
346// is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context)
347// or it aborts on any other error. In other words, either ZAP is not
348// configured or if it is configured it MUST be configured correctly and it
349// MUST work, otherwise authentication cannot be guaranteed and it would be a
350// security flaw.
351int zmq::session_base_t::zap_connect ()
352{
353 if (_zap_pipe != NULL)
354 return 0;
355
356 endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01");
357 if (peer.socket == NULL) {
358 errno = ECONNREFUSED;
359 return -1;
360 }
361 zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER
362 || peer.options.type == ZMQ_SERVER);
363
364 // Create a bi-directional pipe that will connect
365 // session with zap socket.
366 object_t *parents[2] = {this, peer.socket};
367 pipe_t *new_pipes[2] = {NULL, NULL};
368 int hwms[2] = {0, 0};
369 bool conflates[2] = {false, false};
370 int rc = pipepair (parents, new_pipes, hwms, conflates);
371 errno_assert (rc == 0);
372
373 // Attach local end of the pipe to this socket object.
374 _zap_pipe = new_pipes[0];
375 _zap_pipe->set_nodelay ();
376 _zap_pipe->set_event_sink (this);
377
378 send_bind (peer.socket, new_pipes[1], false);
379
380 // Send empty routing id if required by the peer.
381 if (peer.options.recv_routing_id) {
382 msg_t id;
383 rc = id.init ();
384 errno_assert (rc == 0);
385 id.set_flags (msg_t::routing_id);
386 bool ok = _zap_pipe->write (&id);
387 zmq_assert (ok);
388 _zap_pipe->flush ();
389 }
390
391 return 0;
392}
393
394bool zmq::session_base_t::zap_enabled ()
395{
396 return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ());
397}
398
399void zmq::session_base_t::process_attach (i_engine *engine_)
400{
401 zmq_assert (engine_ != NULL);
402
403 // Create the pipe if it does not exist yet.
404 if (!_pipe && !is_terminating ()) {
405 object_t *parents[2] = {this, _socket};
406 pipe_t *pipes[2] = {NULL, NULL};
407
408 const bool conflate = get_effective_conflate_option (options);
409
410 int hwms[2] = {conflate ? -1 : options.rcvhwm,
411 conflate ? -1 : options.sndhwm};
412 bool conflates[2] = {conflate, conflate};
413 int rc = pipepair (parents, pipes, hwms, conflates);
414 errno_assert (rc == 0);
415
416 // Plug the local end of the pipe.
417 pipes[0]->set_event_sink (this);
418
419 // Remember the local end of the pipe.
420 zmq_assert (!_pipe);
421 _pipe = pipes[0];
422
423 // The endpoints strings are not set on bind, set them here so that
424 // events can use them.
425 pipes[0]->set_endpoint_pair (engine_->get_endpoint ());
426 pipes[1]->set_endpoint_pair (engine_->get_endpoint ());
427
428 // Ask socket to plug into the remote end of the pipe.
429 send_bind (_socket, pipes[1]);
430 }
431
432 // Plug in the engine.
433 zmq_assert (!_engine);
434 _engine = engine_;
435 _engine->plug (_io_thread, this);
436}
437
438void zmq::session_base_t::engine_error (zmq::i_engine::error_reason_t reason_)
439{
440 // Engine is dead. Let's forget about it.
441 _engine = NULL;
442
443 // Remove any half-done messages from the pipes.
444 if (_pipe)
445 clean_pipes ();
446
447 zmq_assert (reason_ == i_engine::connection_error
448 || reason_ == i_engine::timeout_error
449 || reason_ == i_engine::protocol_error);
450
451 switch (reason_) {
452 case i_engine::timeout_error:
453 /* FALLTHROUGH */
454 case i_engine::connection_error:
455 if (_active) {
456 reconnect ();
457 break;
458 }
459 /* FALLTHROUGH */
460 case i_engine::protocol_error:
461 if (_pending) {
462 if (_pipe)
463 _pipe->terminate (false);
464 if (_zap_pipe)
465 _zap_pipe->terminate (false);
466 } else {
467 terminate ();
468 }
469 break;
470 }
471
472 // Just in case there's only a delimiter in the pipe.
473 if (_pipe)
474 _pipe->check_read ();
475
476 if (_zap_pipe)
477 _zap_pipe->check_read ();
478}
479
480void zmq::session_base_t::process_term (int linger_)
481{
482 zmq_assert (!_pending);
483
484 // If the termination of the pipe happens before the term command is
485 // delivered there's nothing much to do. We can proceed with the
486 // standard termination immediately.
487 if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) {
488 own_t::process_term (0);
489 return;
490 }
491
492 _pending = true;
493
494 if (_pipe != NULL) {
495 // If there's finite linger value, delay the termination.
496 // If linger is infinite (negative) we don't even have to set
497 // the timer.
498 if (linger_ > 0) {
499 zmq_assert (!_has_linger_timer);
500 add_timer (linger_, linger_timer_id);
501 _has_linger_timer = true;
502 }
503
504 // Start pipe termination process. Delay the termination till all messages
505 // are processed in case the linger time is non-zero.
506 _pipe->terminate (linger_ != 0);
507
508 // TODO: Should this go into pipe_t::terminate ?
509 // In case there's no engine and there's only delimiter in the
510 // pipe it wouldn't be ever read. Thus we check for it explicitly.
511 if (!_engine)
512 _pipe->check_read ();
513 }
514
515 if (_zap_pipe != NULL)
516 _zap_pipe->terminate (false);
517}
518
519void zmq::session_base_t::timer_event (int id_)
520{
521 // Linger period expired. We can proceed with termination even though
522 // there are still pending messages to be sent.
523 zmq_assert (id_ == linger_timer_id);
524 _has_linger_timer = false;
525
526 // Ask pipe to terminate even though there may be pending messages in it.
527 zmq_assert (_pipe);
528 _pipe->terminate (false);
529}
530
531void zmq::session_base_t::reconnect ()
532{
533 // For delayed connect situations, terminate the pipe
534 // and reestablish later on
535 if (_pipe && options.immediate == 1 && _addr->protocol != "pgm"
536 && _addr->protocol != "epgm" && _addr->protocol != "norm"
537 && _addr->protocol != protocol_name::udp) {
538 _pipe->hiccup ();
539 _pipe->terminate (false);
540 _terminating_pipes.insert (_pipe);
541 _pipe = NULL;
542
543 if (_has_linger_timer) {
544 cancel_timer (linger_timer_id);
545 _has_linger_timer = false;
546 }
547 }
548
549 reset ();
550
551 // Reconnect.
552 if (options.reconnect_ivl != -1)
553 start_connecting (true);
554 else {
555 std::string *ep = new (std::string);
556 _addr->to_string (*ep);
557 send_term_endpoint (_socket, ep);
558 }
559
560 // For subscriber sockets we hiccup the inbound pipe, which will cause
561 // the socket object to resend all the subscriptions.
562 if (_pipe
563 && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB
564 || options.type == ZMQ_DISH))
565 _pipe->hiccup ();
566}
567
568zmq::session_base_t::connecter_factory_entry_t
569 zmq::session_base_t::_connecter_factories[] = {
570 connecter_factory_entry_t (protocol_name::tcp,
571 &zmq::session_base_t::create_connecter_tcp),
572#ifdef ZMQ_HAVE_WS
573 connecter_factory_entry_t (protocol_name::ws,
574 &zmq::session_base_t::create_connecter_ws),
575#endif
576#ifdef ZMQ_HAVE_WSS
577 connecter_factory_entry_t (protocol_name::wss,
578 &zmq::session_base_t::create_connecter_wss),
579#endif
580#if defined ZMQ_HAVE_IPC
581 connecter_factory_entry_t (protocol_name::ipc,
582 &zmq::session_base_t::create_connecter_ipc),
583#endif
584#if defined ZMQ_HAVE_TIPC
585 connecter_factory_entry_t (protocol_name::tipc,
586 &zmq::session_base_t::create_connecter_tipc),
587#endif
588#if defined ZMQ_HAVE_VMCI
589 connecter_factory_entry_t (protocol_name::vmci,
590 &zmq::session_base_t::create_connecter_vmci),
591#endif
592};
593
594zmq::session_base_t::connecter_factory_map_t
595 zmq::session_base_t::_connecter_factories_map (
596 _connecter_factories,
597 _connecter_factories
598 + sizeof (_connecter_factories) / sizeof (_connecter_factories[0]));
599
600zmq::session_base_t::start_connecting_entry_t
601 zmq::session_base_t::_start_connecting_entries[] = {
602 start_connecting_entry_t (protocol_name::udp,
603 &zmq::session_base_t::start_connecting_udp),
604#if defined ZMQ_HAVE_OPENPGM
605 start_connecting_entry_t ("pgm",
606 &zmq::session_base_t::start_connecting_pgm),
607 start_connecting_entry_t ("epgm",
608 &zmq::session_base_t::start_connecting_pgm),
609#endif
610#if defined ZMQ_HAVE_NORM
611 start_connecting_entry_t ("norm",
612 &zmq::session_base_t::start_connecting_norm),
613#endif
614};
615
616zmq::session_base_t::start_connecting_map_t
617 zmq::session_base_t::_start_connecting_map (
618 _start_connecting_entries,
619 _start_connecting_entries
620 + sizeof (_start_connecting_entries)
621 / sizeof (_start_connecting_entries[0]));
622
623void zmq::session_base_t::start_connecting (bool wait_)
624{
625 zmq_assert (_active);
626
627 // Choose I/O thread to run connecter in. Given that we are already
628 // running in an I/O thread, there must be at least one available.
629 io_thread_t *io_thread = choose_io_thread (options.affinity);
630 zmq_assert (io_thread);
631
632 // Create the connecter object.
633 const connecter_factory_map_t::const_iterator connecter_factories_it =
634 _connecter_factories_map.find (_addr->protocol);
635 if (connecter_factories_it != _connecter_factories_map.end ()) {
636 own_t *connecter =
637 (this->*connecter_factories_it->second) (io_thread, wait_);
638
639 alloc_assert (connecter);
640 launch_child (connecter);
641 return;
642 }
643 const start_connecting_map_t::const_iterator start_connecting_it =
644 _start_connecting_map.find (_addr->protocol);
645 if (start_connecting_it != _start_connecting_map.end ()) {
646 (this->*start_connecting_it->second) (io_thread);
647 return;
648 }
649
650 zmq_assert (false);
651}
652
653#if defined ZMQ_HAVE_VMCI
654zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_,
655 bool wait_)
656{
657 return new (std::nothrow)
658 vmci_connecter_t (io_thread_, this, options, _addr, wait_);
659}
660#endif
661
662#if defined ZMQ_HAVE_TIPC
663zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_,
664 bool wait_)
665{
666 return new (std::nothrow)
667 tipc_connecter_t (io_thread_, this, options, _addr, wait_);
668}
669#endif
670
671#if defined ZMQ_HAVE_IPC
672zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_,
673 bool wait_)
674{
675 return new (std::nothrow)
676 ipc_connecter_t (io_thread_, this, options, _addr, wait_);
677}
678#endif
679
680zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_,
681 bool wait_)
682{
683 if (!options.socks_proxy_address.empty ()) {
684 address_t *proxy_address = new (std::nothrow) address_t (
685 protocol_name::tcp, options.socks_proxy_address, this->get_ctx ());
686 alloc_assert (proxy_address);
687 socks_connecter_t *connecter = new (std::nothrow) socks_connecter_t (
688 io_thread_, this, options, _addr, proxy_address, wait_);
689 alloc_assert (connecter);
690 if (!options.socks_proxy_username.empty ()) {
691 connecter->set_auth_method_basic (options.socks_proxy_username,
692 options.socks_proxy_password);
693 }
694 return connecter;
695 }
696 return new (std::nothrow)
697 tcp_connecter_t (io_thread_, this, options, _addr, wait_);
698}
699
700#ifdef ZMQ_HAVE_WS
701zmq::own_t *zmq::session_base_t::create_connecter_ws (io_thread_t *io_thread_,
702 bool wait_)
703{
704 return new (std::nothrow)
705 ws_connecter_t (io_thread_, this, options, _addr, wait_, false, NULL);
706}
707#endif
708
709#ifdef ZMQ_HAVE_WSS
710zmq::own_t *zmq::session_base_t::create_connecter_wss (io_thread_t *io_thread_,
711 bool wait_)
712{
713 return new (std::nothrow) ws_connecter_t (io_thread_, this, options, _addr,
714 wait_, true, _wss_hostname);
715}
716#endif
717
718#ifdef ZMQ_HAVE_OPENPGM
719void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_)
720{
721 zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB
722 || options.type == ZMQ_SUB || options.type == ZMQ_XSUB);
723
724 // For EPGM transport with UDP encapsulation of PGM is used.
725 bool const udp_encapsulation = _addr->protocol == "epgm";
726
727 // At this point we'll create message pipes to the session straight
728 // away. There's no point in delaying it as no concept of 'connect'
729 // exists with PGM anyway.
730 if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
731 // PGM sender.
732 pgm_sender_t *pgm_sender =
733 new (std::nothrow) pgm_sender_t (io_thread_, options);
734 alloc_assert (pgm_sender);
735
736 int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ());
737 errno_assert (rc == 0);
738
739 send_attach (this, pgm_sender);
740 } else {
741 // PGM receiver.
742 pgm_receiver_t *pgm_receiver =
743 new (std::nothrow) pgm_receiver_t (io_thread_, options);
744 alloc_assert (pgm_receiver);
745
746 int rc =
747 pgm_receiver->init (udp_encapsulation, _addr->address.c_str ());
748 errno_assert (rc == 0);
749
750 send_attach (this, pgm_receiver);
751 }
752}
753#endif
754
755#ifdef ZMQ_HAVE_NORM
756void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_)
757{
758 // At this point we'll create message pipes to the session straight
759 // away. There's no point in delaying it as no concept of 'connect'
760 // exists with NORM anyway.
761 if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) {
762 // NORM sender.
763 norm_engine_t *norm_sender =
764 new (std::nothrow) norm_engine_t (io_thread_, options);
765 alloc_assert (norm_sender);
766
767 int rc = norm_sender->init (_addr->address.c_str (), true, false);
768 errno_assert (rc == 0);
769
770 send_attach (this, norm_sender);
771 } else { // ZMQ_SUB or ZMQ_XSUB
772
773 // NORM receiver.
774 norm_engine_t *norm_receiver =
775 new (std::nothrow) norm_engine_t (io_thread_, options);
776 alloc_assert (norm_receiver);
777
778 int rc = norm_receiver->init (_addr->address.c_str (), false, true);
779 errno_assert (rc == 0);
780
781 send_attach (this, norm_receiver);
782 }
783}
784#endif
785
786void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/)
787{
788 zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
789 || options.type == ZMQ_DGRAM);
790
791 udp_engine_t *engine = new (std::nothrow) udp_engine_t (options);
792 alloc_assert (engine);
793
794 const bool recv = options.type == ZMQ_DISH || options.type == ZMQ_DGRAM;
795 const bool send = options.type == ZMQ_RADIO || options.type == ZMQ_DGRAM;
796
797 const int rc = engine->init (_addr, send, recv);
798 errno_assert (rc == 0);
799
800 send_attach (this, engine);
801}
802