1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | #include "macros.hpp" |
32 | #include "session_base.hpp" |
33 | #include "i_engine.hpp" |
34 | #include "err.hpp" |
35 | #include "pipe.hpp" |
36 | #include "likely.hpp" |
37 | #include "tcp_connecter.hpp" |
38 | #include "ws_connecter.hpp" |
39 | #include "ipc_connecter.hpp" |
40 | #include "tipc_connecter.hpp" |
41 | #include "socks_connecter.hpp" |
42 | #include "vmci_connecter.hpp" |
43 | #include "pgm_sender.hpp" |
44 | #include "pgm_receiver.hpp" |
45 | #include "address.hpp" |
46 | #include "norm_engine.hpp" |
47 | #include "udp_engine.hpp" |
48 | |
49 | #include "ctx.hpp" |
50 | #include "req.hpp" |
51 | #include "radio.hpp" |
52 | #include "dish.hpp" |
53 | |
54 | zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, |
55 | bool active_, |
56 | class socket_base_t *socket_, |
57 | const options_t &options_, |
58 | address_t *addr_) |
59 | { |
60 | session_base_t *s = NULL; |
61 | switch (options_.type) { |
62 | case ZMQ_REQ: |
63 | s = new (std::nothrow) |
64 | req_session_t (io_thread_, active_, socket_, options_, addr_); |
65 | break; |
66 | case ZMQ_RADIO: |
67 | s = new (std::nothrow) |
68 | radio_session_t (io_thread_, active_, socket_, options_, addr_); |
69 | break; |
70 | case ZMQ_DISH: |
71 | s = new (std::nothrow) |
72 | dish_session_t (io_thread_, active_, socket_, options_, addr_); |
73 | break; |
74 | case ZMQ_DEALER: |
75 | case ZMQ_REP: |
76 | case ZMQ_ROUTER: |
77 | case ZMQ_PUB: |
78 | case ZMQ_XPUB: |
79 | case ZMQ_SUB: |
80 | case ZMQ_XSUB: |
81 | case ZMQ_PUSH: |
82 | case ZMQ_PULL: |
83 | case ZMQ_PAIR: |
84 | case ZMQ_STREAM: |
85 | case ZMQ_SERVER: |
86 | case ZMQ_CLIENT: |
87 | case ZMQ_GATHER: |
88 | case ZMQ_SCATTER: |
89 | case ZMQ_DGRAM: |
90 | s = new (std::nothrow) |
91 | session_base_t (io_thread_, active_, socket_, options_, addr_); |
92 | break; |
93 | default: |
94 | errno = EINVAL; |
95 | return NULL; |
96 | } |
97 | alloc_assert (s); |
98 | return s; |
99 | } |
100 | |
101 | zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, |
102 | bool active_, |
103 | class socket_base_t *socket_, |
104 | const options_t &options_, |
105 | address_t *addr_) : |
106 | own_t (io_thread_, options_), |
107 | io_object_t (io_thread_), |
108 | _active (active_), |
109 | _pipe (NULL), |
110 | _zap_pipe (NULL), |
111 | _incomplete_in (false), |
112 | _pending (false), |
113 | _engine (NULL), |
114 | _socket (socket_), |
115 | _io_thread (io_thread_), |
116 | _has_linger_timer (false), |
117 | _addr (addr_), |
118 | _wss_hostname (NULL) |
119 | { |
120 | if (options_.wss_hostname.length () > 0) { |
121 | _wss_hostname = |
122 | static_cast<char *> (malloc (options_.wss_hostname.length () + 1)); |
123 | assert (_wss_hostname); |
124 | strcpy (_wss_hostname, options_.wss_hostname.c_str ()); |
125 | } |
126 | } |
127 | |
128 | const zmq::endpoint_uri_pair_t &zmq::session_base_t::get_endpoint () const |
129 | { |
130 | return _engine->get_endpoint (); |
131 | } |
132 | |
133 | zmq::session_base_t::~session_base_t () |
134 | { |
135 | zmq_assert (!_pipe); |
136 | zmq_assert (!_zap_pipe); |
137 | |
138 | // If there's still a pending linger timer, remove it. |
139 | if (_has_linger_timer) { |
140 | cancel_timer (linger_timer_id); |
141 | _has_linger_timer = false; |
142 | } |
143 | |
144 | // Close the engine. |
145 | if (_engine) |
146 | _engine->terminate (); |
147 | |
148 | if (_wss_hostname) |
149 | free (_wss_hostname); |
150 | |
151 | LIBZMQ_DELETE (_addr); |
152 | } |
153 | |
154 | void zmq::session_base_t::attach_pipe (pipe_t *pipe_) |
155 | { |
156 | zmq_assert (!is_terminating ()); |
157 | zmq_assert (!_pipe); |
158 | zmq_assert (pipe_); |
159 | _pipe = pipe_; |
160 | _pipe->set_event_sink (this); |
161 | } |
162 | |
163 | int zmq::session_base_t::pull_msg (msg_t *msg_) |
164 | { |
165 | if (!_pipe || !_pipe->read (msg_)) { |
166 | errno = EAGAIN; |
167 | return -1; |
168 | } |
169 | |
170 | _incomplete_in = (msg_->flags () & msg_t::more) != 0; |
171 | |
172 | return 0; |
173 | } |
174 | |
175 | int zmq::session_base_t::push_msg (msg_t *msg_) |
176 | { |
177 | // pass subscribe/cancel to the sockets |
178 | if ((msg_->flags () & msg_t::command) && !msg_->is_subscribe () |
179 | && !msg_->is_cancel ()) |
180 | return 0; |
181 | if (_pipe && _pipe->write (msg_)) { |
182 | int rc = msg_->init (); |
183 | errno_assert (rc == 0); |
184 | return 0; |
185 | } |
186 | |
187 | errno = EAGAIN; |
188 | return -1; |
189 | } |
190 | |
191 | int zmq::session_base_t::read_zap_msg (msg_t *msg_) |
192 | { |
193 | if (_zap_pipe == NULL) { |
194 | errno = ENOTCONN; |
195 | return -1; |
196 | } |
197 | |
198 | if (!_zap_pipe->read (msg_)) { |
199 | errno = EAGAIN; |
200 | return -1; |
201 | } |
202 | |
203 | return 0; |
204 | } |
205 | |
206 | int zmq::session_base_t::write_zap_msg (msg_t *msg_) |
207 | { |
208 | if (_zap_pipe == NULL || !_zap_pipe->write (msg_)) { |
209 | errno = ENOTCONN; |
210 | return -1; |
211 | } |
212 | |
213 | if ((msg_->flags () & msg_t::more) == 0) |
214 | _zap_pipe->flush (); |
215 | |
216 | const int rc = msg_->init (); |
217 | errno_assert (rc == 0); |
218 | return 0; |
219 | } |
220 | |
221 | void zmq::session_base_t::reset () |
222 | { |
223 | } |
224 | |
225 | void zmq::session_base_t::flush () |
226 | { |
227 | if (_pipe) |
228 | _pipe->flush (); |
229 | } |
230 | |
231 | void zmq::session_base_t::rollback () |
232 | { |
233 | if (_pipe) |
234 | _pipe->rollback (); |
235 | } |
236 | |
237 | void zmq::session_base_t::clean_pipes () |
238 | { |
239 | zmq_assert (_pipe != NULL); |
240 | |
241 | // Get rid of half-processed messages in the out pipe. Flush any |
242 | // unflushed messages upstream. |
243 | _pipe->rollback (); |
244 | _pipe->flush (); |
245 | |
246 | // Remove any half-read message from the in pipe. |
247 | while (_incomplete_in) { |
248 | msg_t msg; |
249 | int rc = msg.init (); |
250 | errno_assert (rc == 0); |
251 | rc = pull_msg (&msg); |
252 | errno_assert (rc == 0); |
253 | rc = msg.close (); |
254 | errno_assert (rc == 0); |
255 | } |
256 | } |
257 | |
258 | void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) |
259 | { |
260 | // Drop the reference to the deallocated pipe if required. |
261 | zmq_assert (pipe_ == _pipe || pipe_ == _zap_pipe |
262 | || _terminating_pipes.count (pipe_) == 1); |
263 | |
264 | if (pipe_ == _pipe) { |
265 | // If this is our current pipe, remove it |
266 | _pipe = NULL; |
267 | if (_has_linger_timer) { |
268 | cancel_timer (linger_timer_id); |
269 | _has_linger_timer = false; |
270 | } |
271 | } else if (pipe_ == _zap_pipe) |
272 | _zap_pipe = NULL; |
273 | else |
274 | // Remove the pipe from the detached pipes set |
275 | _terminating_pipes.erase (pipe_); |
276 | |
277 | if (!is_terminating () && options.raw_socket) { |
278 | if (_engine) { |
279 | _engine->terminate (); |
280 | _engine = NULL; |
281 | } |
282 | terminate (); |
283 | } |
284 | |
285 | // If we are waiting for pending messages to be sent, at this point |
286 | // we are sure that there will be no more messages and we can proceed |
287 | // with termination safely. |
288 | if (_pending && !_pipe && !_zap_pipe && _terminating_pipes.empty ()) { |
289 | _pending = false; |
290 | own_t::process_term (0); |
291 | } |
292 | } |
293 | |
294 | void zmq::session_base_t::read_activated (pipe_t *pipe_) |
295 | { |
296 | // Skip activating if we're detaching this pipe |
297 | if (unlikely (pipe_ != _pipe && pipe_ != _zap_pipe)) { |
298 | zmq_assert (_terminating_pipes.count (pipe_) == 1); |
299 | return; |
300 | } |
301 | |
302 | if (unlikely (_engine == NULL)) { |
303 | _pipe->check_read (); |
304 | return; |
305 | } |
306 | |
307 | if (likely (pipe_ == _pipe)) |
308 | _engine->restart_output (); |
309 | else { |
310 | // i.e. pipe_ == zap_pipe |
311 | _engine->zap_msg_available (); |
312 | } |
313 | } |
314 | |
315 | void zmq::session_base_t::write_activated (pipe_t *pipe_) |
316 | { |
317 | // Skip activating if we're detaching this pipe |
318 | if (_pipe != pipe_) { |
319 | zmq_assert (_terminating_pipes.count (pipe_) == 1); |
320 | return; |
321 | } |
322 | |
323 | if (_engine) |
324 | _engine->restart_input (); |
325 | } |
326 | |
327 | void zmq::session_base_t::hiccuped (pipe_t *) |
328 | { |
329 | // Hiccups are always sent from session to socket, not the other |
330 | // way round. |
331 | zmq_assert (false); |
332 | } |
333 | |
334 | zmq::socket_base_t *zmq::session_base_t::get_socket () |
335 | { |
336 | return _socket; |
337 | } |
338 | |
339 | void zmq::session_base_t::process_plug () |
340 | { |
341 | if (_active) |
342 | start_connecting (false); |
343 | } |
344 | |
345 | // This functions can return 0 on success or -1 and errno=ECONNREFUSED if ZAP |
346 | // is not setup (IE: inproc://zeromq.zap.01 does not exist in the same context) |
347 | // or it aborts on any other error. In other words, either ZAP is not |
348 | // configured or if it is configured it MUST be configured correctly and it |
349 | // MUST work, otherwise authentication cannot be guaranteed and it would be a |
350 | // security flaw. |
351 | int zmq::session_base_t::zap_connect () |
352 | { |
353 | if (_zap_pipe != NULL) |
354 | return 0; |
355 | |
356 | endpoint_t peer = find_endpoint ("inproc://zeromq.zap.01" ); |
357 | if (peer.socket == NULL) { |
358 | errno = ECONNREFUSED; |
359 | return -1; |
360 | } |
361 | zmq_assert (peer.options.type == ZMQ_REP || peer.options.type == ZMQ_ROUTER |
362 | || peer.options.type == ZMQ_SERVER); |
363 | |
364 | // Create a bi-directional pipe that will connect |
365 | // session with zap socket. |
366 | object_t *parents[2] = {this, peer.socket}; |
367 | pipe_t *new_pipes[2] = {NULL, NULL}; |
368 | int hwms[2] = {0, 0}; |
369 | bool conflates[2] = {false, false}; |
370 | int rc = pipepair (parents, new_pipes, hwms, conflates); |
371 | errno_assert (rc == 0); |
372 | |
373 | // Attach local end of the pipe to this socket object. |
374 | _zap_pipe = new_pipes[0]; |
375 | _zap_pipe->set_nodelay (); |
376 | _zap_pipe->set_event_sink (this); |
377 | |
378 | send_bind (peer.socket, new_pipes[1], false); |
379 | |
380 | // Send empty routing id if required by the peer. |
381 | if (peer.options.recv_routing_id) { |
382 | msg_t id; |
383 | rc = id.init (); |
384 | errno_assert (rc == 0); |
385 | id.set_flags (msg_t::routing_id); |
386 | bool ok = _zap_pipe->write (&id); |
387 | zmq_assert (ok); |
388 | _zap_pipe->flush (); |
389 | } |
390 | |
391 | return 0; |
392 | } |
393 | |
394 | bool zmq::session_base_t::zap_enabled () |
395 | { |
396 | return (options.mechanism != ZMQ_NULL || !options.zap_domain.empty ()); |
397 | } |
398 | |
399 | void zmq::session_base_t::process_attach (i_engine *engine_) |
400 | { |
401 | zmq_assert (engine_ != NULL); |
402 | |
403 | // Create the pipe if it does not exist yet. |
404 | if (!_pipe && !is_terminating ()) { |
405 | object_t *parents[2] = {this, _socket}; |
406 | pipe_t *pipes[2] = {NULL, NULL}; |
407 | |
408 | const bool conflate = get_effective_conflate_option (options); |
409 | |
410 | int hwms[2] = {conflate ? -1 : options.rcvhwm, |
411 | conflate ? -1 : options.sndhwm}; |
412 | bool conflates[2] = {conflate, conflate}; |
413 | int rc = pipepair (parents, pipes, hwms, conflates); |
414 | errno_assert (rc == 0); |
415 | |
416 | // Plug the local end of the pipe. |
417 | pipes[0]->set_event_sink (this); |
418 | |
419 | // Remember the local end of the pipe. |
420 | zmq_assert (!_pipe); |
421 | _pipe = pipes[0]; |
422 | |
423 | // The endpoints strings are not set on bind, set them here so that |
424 | // events can use them. |
425 | pipes[0]->set_endpoint_pair (engine_->get_endpoint ()); |
426 | pipes[1]->set_endpoint_pair (engine_->get_endpoint ()); |
427 | |
428 | // Ask socket to plug into the remote end of the pipe. |
429 | send_bind (_socket, pipes[1]); |
430 | } |
431 | |
432 | // Plug in the engine. |
433 | zmq_assert (!_engine); |
434 | _engine = engine_; |
435 | _engine->plug (_io_thread, this); |
436 | } |
437 | |
438 | void zmq::session_base_t::engine_error (zmq::i_engine::error_reason_t reason_) |
439 | { |
440 | // Engine is dead. Let's forget about it. |
441 | _engine = NULL; |
442 | |
443 | // Remove any half-done messages from the pipes. |
444 | if (_pipe) |
445 | clean_pipes (); |
446 | |
447 | zmq_assert (reason_ == i_engine::connection_error |
448 | || reason_ == i_engine::timeout_error |
449 | || reason_ == i_engine::protocol_error); |
450 | |
451 | switch (reason_) { |
452 | case i_engine::timeout_error: |
453 | /* FALLTHROUGH */ |
454 | case i_engine::connection_error: |
455 | if (_active) { |
456 | reconnect (); |
457 | break; |
458 | } |
459 | /* FALLTHROUGH */ |
460 | case i_engine::protocol_error: |
461 | if (_pending) { |
462 | if (_pipe) |
463 | _pipe->terminate (false); |
464 | if (_zap_pipe) |
465 | _zap_pipe->terminate (false); |
466 | } else { |
467 | terminate (); |
468 | } |
469 | break; |
470 | } |
471 | |
472 | // Just in case there's only a delimiter in the pipe. |
473 | if (_pipe) |
474 | _pipe->check_read (); |
475 | |
476 | if (_zap_pipe) |
477 | _zap_pipe->check_read (); |
478 | } |
479 | |
480 | void zmq::session_base_t::process_term (int linger_) |
481 | { |
482 | zmq_assert (!_pending); |
483 | |
484 | // If the termination of the pipe happens before the term command is |
485 | // delivered there's nothing much to do. We can proceed with the |
486 | // standard termination immediately. |
487 | if (!_pipe && !_zap_pipe && _terminating_pipes.empty ()) { |
488 | own_t::process_term (0); |
489 | return; |
490 | } |
491 | |
492 | _pending = true; |
493 | |
494 | if (_pipe != NULL) { |
495 | // If there's finite linger value, delay the termination. |
496 | // If linger is infinite (negative) we don't even have to set |
497 | // the timer. |
498 | if (linger_ > 0) { |
499 | zmq_assert (!_has_linger_timer); |
500 | add_timer (linger_, linger_timer_id); |
501 | _has_linger_timer = true; |
502 | } |
503 | |
504 | // Start pipe termination process. Delay the termination till all messages |
505 | // are processed in case the linger time is non-zero. |
506 | _pipe->terminate (linger_ != 0); |
507 | |
508 | // TODO: Should this go into pipe_t::terminate ? |
509 | // In case there's no engine and there's only delimiter in the |
510 | // pipe it wouldn't be ever read. Thus we check for it explicitly. |
511 | if (!_engine) |
512 | _pipe->check_read (); |
513 | } |
514 | |
515 | if (_zap_pipe != NULL) |
516 | _zap_pipe->terminate (false); |
517 | } |
518 | |
519 | void zmq::session_base_t::timer_event (int id_) |
520 | { |
521 | // Linger period expired. We can proceed with termination even though |
522 | // there are still pending messages to be sent. |
523 | zmq_assert (id_ == linger_timer_id); |
524 | _has_linger_timer = false; |
525 | |
526 | // Ask pipe to terminate even though there may be pending messages in it. |
527 | zmq_assert (_pipe); |
528 | _pipe->terminate (false); |
529 | } |
530 | |
531 | void zmq::session_base_t::reconnect () |
532 | { |
533 | // For delayed connect situations, terminate the pipe |
534 | // and reestablish later on |
535 | if (_pipe && options.immediate == 1 && _addr->protocol != "pgm" |
536 | && _addr->protocol != "epgm" && _addr->protocol != "norm" |
537 | && _addr->protocol != protocol_name::udp) { |
538 | _pipe->hiccup (); |
539 | _pipe->terminate (false); |
540 | _terminating_pipes.insert (_pipe); |
541 | _pipe = NULL; |
542 | |
543 | if (_has_linger_timer) { |
544 | cancel_timer (linger_timer_id); |
545 | _has_linger_timer = false; |
546 | } |
547 | } |
548 | |
549 | reset (); |
550 | |
551 | // Reconnect. |
552 | if (options.reconnect_ivl != -1) |
553 | start_connecting (true); |
554 | else { |
555 | std::string *ep = new (std::string); |
556 | _addr->to_string (*ep); |
557 | send_term_endpoint (_socket, ep); |
558 | } |
559 | |
560 | // For subscriber sockets we hiccup the inbound pipe, which will cause |
561 | // the socket object to resend all the subscriptions. |
562 | if (_pipe |
563 | && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB |
564 | || options.type == ZMQ_DISH)) |
565 | _pipe->hiccup (); |
566 | } |
567 | |
568 | zmq::session_base_t::connecter_factory_entry_t |
569 | zmq::session_base_t::_connecter_factories[] = { |
570 | connecter_factory_entry_t (protocol_name::tcp, |
571 | &zmq::session_base_t::create_connecter_tcp), |
572 | #ifdef ZMQ_HAVE_WS |
573 | connecter_factory_entry_t (protocol_name::ws, |
574 | &zmq::session_base_t::create_connecter_ws), |
575 | #endif |
576 | #ifdef ZMQ_HAVE_WSS |
577 | connecter_factory_entry_t (protocol_name::wss, |
578 | &zmq::session_base_t::create_connecter_wss), |
579 | #endif |
580 | #if defined ZMQ_HAVE_IPC |
581 | connecter_factory_entry_t (protocol_name::ipc, |
582 | &zmq::session_base_t::create_connecter_ipc), |
583 | #endif |
584 | #if defined ZMQ_HAVE_TIPC |
585 | connecter_factory_entry_t (protocol_name::tipc, |
586 | &zmq::session_base_t::create_connecter_tipc), |
587 | #endif |
588 | #if defined ZMQ_HAVE_VMCI |
589 | connecter_factory_entry_t (protocol_name::vmci, |
590 | &zmq::session_base_t::create_connecter_vmci), |
591 | #endif |
592 | }; |
593 | |
594 | zmq::session_base_t::connecter_factory_map_t |
595 | zmq::session_base_t::_connecter_factories_map ( |
596 | _connecter_factories, |
597 | _connecter_factories |
598 | + sizeof (_connecter_factories) / sizeof (_connecter_factories[0])); |
599 | |
600 | zmq::session_base_t::start_connecting_entry_t |
601 | zmq::session_base_t::_start_connecting_entries[] = { |
602 | start_connecting_entry_t (protocol_name::udp, |
603 | &zmq::session_base_t::start_connecting_udp), |
604 | #if defined ZMQ_HAVE_OPENPGM |
605 | start_connecting_entry_t ("pgm" , |
606 | &zmq::session_base_t::start_connecting_pgm), |
607 | start_connecting_entry_t ("epgm" , |
608 | &zmq::session_base_t::start_connecting_pgm), |
609 | #endif |
610 | #if defined ZMQ_HAVE_NORM |
611 | start_connecting_entry_t ("norm" , |
612 | &zmq::session_base_t::start_connecting_norm), |
613 | #endif |
614 | }; |
615 | |
616 | zmq::session_base_t::start_connecting_map_t |
617 | zmq::session_base_t::_start_connecting_map ( |
618 | _start_connecting_entries, |
619 | _start_connecting_entries |
620 | + sizeof (_start_connecting_entries) |
621 | / sizeof (_start_connecting_entries[0])); |
622 | |
623 | void zmq::session_base_t::start_connecting (bool wait_) |
624 | { |
625 | zmq_assert (_active); |
626 | |
627 | // Choose I/O thread to run connecter in. Given that we are already |
628 | // running in an I/O thread, there must be at least one available. |
629 | io_thread_t *io_thread = choose_io_thread (options.affinity); |
630 | zmq_assert (io_thread); |
631 | |
632 | // Create the connecter object. |
633 | const connecter_factory_map_t::const_iterator connecter_factories_it = |
634 | _connecter_factories_map.find (_addr->protocol); |
635 | if (connecter_factories_it != _connecter_factories_map.end ()) { |
636 | own_t *connecter = |
637 | (this->*connecter_factories_it->second) (io_thread, wait_); |
638 | |
639 | alloc_assert (connecter); |
640 | launch_child (connecter); |
641 | return; |
642 | } |
643 | const start_connecting_map_t::const_iterator start_connecting_it = |
644 | _start_connecting_map.find (_addr->protocol); |
645 | if (start_connecting_it != _start_connecting_map.end ()) { |
646 | (this->*start_connecting_it->second) (io_thread); |
647 | return; |
648 | } |
649 | |
650 | zmq_assert (false); |
651 | } |
652 | |
653 | #if defined ZMQ_HAVE_VMCI |
654 | zmq::own_t *zmq::session_base_t::create_connecter_vmci (io_thread_t *io_thread_, |
655 | bool wait_) |
656 | { |
657 | return new (std::nothrow) |
658 | vmci_connecter_t (io_thread_, this, options, _addr, wait_); |
659 | } |
660 | #endif |
661 | |
662 | #if defined ZMQ_HAVE_TIPC |
663 | zmq::own_t *zmq::session_base_t::create_connecter_tipc (io_thread_t *io_thread_, |
664 | bool wait_) |
665 | { |
666 | return new (std::nothrow) |
667 | tipc_connecter_t (io_thread_, this, options, _addr, wait_); |
668 | } |
669 | #endif |
670 | |
671 | #if defined ZMQ_HAVE_IPC |
672 | zmq::own_t *zmq::session_base_t::create_connecter_ipc (io_thread_t *io_thread_, |
673 | bool wait_) |
674 | { |
675 | return new (std::nothrow) |
676 | ipc_connecter_t (io_thread_, this, options, _addr, wait_); |
677 | } |
678 | #endif |
679 | |
680 | zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_, |
681 | bool wait_) |
682 | { |
683 | if (!options.socks_proxy_address.empty ()) { |
684 | address_t *proxy_address = new (std::nothrow) address_t ( |
685 | protocol_name::tcp, options.socks_proxy_address, this->get_ctx ()); |
686 | alloc_assert (proxy_address); |
687 | socks_connecter_t *connecter = new (std::nothrow) socks_connecter_t ( |
688 | io_thread_, this, options, _addr, proxy_address, wait_); |
689 | alloc_assert (connecter); |
690 | if (!options.socks_proxy_username.empty ()) { |
691 | connecter->set_auth_method_basic (options.socks_proxy_username, |
692 | options.socks_proxy_password); |
693 | } |
694 | return connecter; |
695 | } |
696 | return new (std::nothrow) |
697 | tcp_connecter_t (io_thread_, this, options, _addr, wait_); |
698 | } |
699 | |
700 | #ifdef ZMQ_HAVE_WS |
701 | zmq::own_t *zmq::session_base_t::create_connecter_ws (io_thread_t *io_thread_, |
702 | bool wait_) |
703 | { |
704 | return new (std::nothrow) |
705 | ws_connecter_t (io_thread_, this, options, _addr, wait_, false, NULL); |
706 | } |
707 | #endif |
708 | |
709 | #ifdef ZMQ_HAVE_WSS |
710 | zmq::own_t *zmq::session_base_t::create_connecter_wss (io_thread_t *io_thread_, |
711 | bool wait_) |
712 | { |
713 | return new (std::nothrow) ws_connecter_t (io_thread_, this, options, _addr, |
714 | wait_, true, _wss_hostname); |
715 | } |
716 | #endif |
717 | |
718 | #ifdef ZMQ_HAVE_OPENPGM |
719 | void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_) |
720 | { |
721 | zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB |
722 | || options.type == ZMQ_SUB || options.type == ZMQ_XSUB); |
723 | |
724 | // For EPGM transport with UDP encapsulation of PGM is used. |
725 | bool const udp_encapsulation = _addr->protocol == "epgm" ; |
726 | |
727 | // At this point we'll create message pipes to the session straight |
728 | // away. There's no point in delaying it as no concept of 'connect' |
729 | // exists with PGM anyway. |
730 | if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { |
731 | // PGM sender. |
732 | pgm_sender_t *pgm_sender = |
733 | new (std::nothrow) pgm_sender_t (io_thread_, options); |
734 | alloc_assert (pgm_sender); |
735 | |
736 | int rc = pgm_sender->init (udp_encapsulation, _addr->address.c_str ()); |
737 | errno_assert (rc == 0); |
738 | |
739 | send_attach (this, pgm_sender); |
740 | } else { |
741 | // PGM receiver. |
742 | pgm_receiver_t *pgm_receiver = |
743 | new (std::nothrow) pgm_receiver_t (io_thread_, options); |
744 | alloc_assert (pgm_receiver); |
745 | |
746 | int rc = |
747 | pgm_receiver->init (udp_encapsulation, _addr->address.c_str ()); |
748 | errno_assert (rc == 0); |
749 | |
750 | send_attach (this, pgm_receiver); |
751 | } |
752 | } |
753 | #endif |
754 | |
755 | #ifdef ZMQ_HAVE_NORM |
756 | void zmq::session_base_t::start_connecting_norm (io_thread_t *io_thread_) |
757 | { |
758 | // At this point we'll create message pipes to the session straight |
759 | // away. There's no point in delaying it as no concept of 'connect' |
760 | // exists with NORM anyway. |
761 | if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { |
762 | // NORM sender. |
763 | norm_engine_t *norm_sender = |
764 | new (std::nothrow) norm_engine_t (io_thread_, options); |
765 | alloc_assert (norm_sender); |
766 | |
767 | int rc = norm_sender->init (_addr->address.c_str (), true, false); |
768 | errno_assert (rc == 0); |
769 | |
770 | send_attach (this, norm_sender); |
771 | } else { // ZMQ_SUB or ZMQ_XSUB |
772 | |
773 | // NORM receiver. |
774 | norm_engine_t *norm_receiver = |
775 | new (std::nothrow) norm_engine_t (io_thread_, options); |
776 | alloc_assert (norm_receiver); |
777 | |
778 | int rc = norm_receiver->init (_addr->address.c_str (), false, true); |
779 | errno_assert (rc == 0); |
780 | |
781 | send_attach (this, norm_receiver); |
782 | } |
783 | } |
784 | #endif |
785 | |
786 | void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/) |
787 | { |
788 | zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO |
789 | || options.type == ZMQ_DGRAM); |
790 | |
791 | udp_engine_t *engine = new (std::nothrow) udp_engine_t (options); |
792 | alloc_assert (engine); |
793 | |
794 | const bool recv = options.type == ZMQ_DISH || options.type == ZMQ_DGRAM; |
795 | const bool send = options.type == ZMQ_RADIO || options.type == ZMQ_DGRAM; |
796 | |
797 | const int rc = engine->init (_addr, send, recv); |
798 | errno_assert (rc == 0); |
799 | |
800 | send_attach (this, engine); |
801 | } |
802 | |