1/*
2 Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include <new>
32#include <string>
33
34#include "macros.hpp"
35#include "ws_connecter.hpp"
36#include "io_thread.hpp"
37#include "err.hpp"
38#include "ip.hpp"
39#include "tcp.hpp"
40#include "address.hpp"
41#include "ws_address.hpp"
42#include "wss_address.hpp"
43#include "session_base.hpp"
44#include "ws_engine.hpp"
45
46#ifdef ZMQ_HAVE_WSS
47#include "wss_engine.hpp"
48#endif
49
50#if !defined ZMQ_HAVE_WINDOWS
51#include <unistd.h>
52#include <sys/types.h>
53#include <sys/socket.h>
54#include <arpa/inet.h>
55#include <netinet/tcp.h>
56#include <netinet/in.h>
57#include <netdb.h>
58#include <fcntl.h>
59#ifdef ZMQ_HAVE_VXWORKS
60#include <sockLib.h>
61#endif
62#ifdef ZMQ_HAVE_OPENVMS
63#include <ioctl.h>
64#endif
65#endif
66
67#ifdef __APPLE__
68#include <TargetConditionals.h>
69#endif
70
71zmq::ws_connecter_t::ws_connecter_t (class io_thread_t *io_thread_,
72 class session_base_t *session_,
73 const options_t &options_,
74 address_t *addr_,
75 bool delayed_start_,
76 bool wss_,
77 const char *tls_hostname_) :
78 stream_connecter_base_t (
79 io_thread_, session_, options_, addr_, delayed_start_),
80 _connect_timer_started (false),
81 _wss (wss_),
82 _hostname (tls_hostname_)
83{
84}
85
86zmq::ws_connecter_t::~ws_connecter_t ()
87{
88 zmq_assert (!_connect_timer_started);
89}
90
91void zmq::ws_connecter_t::process_term (int linger_)
92{
93 if (_connect_timer_started) {
94 cancel_timer (connect_timer_id);
95 _connect_timer_started = false;
96 }
97
98 stream_connecter_base_t::process_term (linger_);
99}
100
101void zmq::ws_connecter_t::out_event ()
102{
103 if (_connect_timer_started) {
104 cancel_timer (connect_timer_id);
105 _connect_timer_started = false;
106 }
107
108 // TODO this is still very similar to (t)ipc_connecter_t, maybe the
109 // differences can be factored out
110
111 rm_handle ();
112
113 const fd_t fd = connect ();
114
115 // Handle the error condition by attempt to reconnect.
116 if (fd == retired_fd || !tune_socket (fd)) {
117 close ();
118 add_reconnect_timer ();
119 return;
120 }
121
122 if (_wss)
123 create_engine (fd,
124 get_socket_name<wss_address_t> (fd, socket_end_local));
125 else
126 create_engine (fd,
127 get_socket_name<ws_address_t> (fd, socket_end_local));
128}
129
130void zmq::ws_connecter_t::timer_event (int id_)
131{
132 if (id_ == connect_timer_id) {
133 _connect_timer_started = false;
134 rm_handle ();
135 close ();
136 add_reconnect_timer ();
137 } else
138 stream_connecter_base_t::timer_event (id_);
139}
140
141void zmq::ws_connecter_t::start_connecting ()
142{
143 // Open the connecting socket.
144 const int rc = open ();
145
146 // Connect may succeed in synchronous manner.
147 if (rc == 0) {
148 _handle = add_fd (_s);
149 out_event ();
150 }
151
152 // Connection establishment may be delayed. Poll for its completion.
153 else if (rc == -1 && errno == EINPROGRESS) {
154 _handle = add_fd (_s);
155 set_pollout (_handle);
156 _socket->event_connect_delayed (
157 make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
158
159 // add userspace connect timeout
160 add_connect_timer ();
161 }
162
163 // Handle any other error condition by eventual reconnect.
164 else {
165 if (_s != retired_fd)
166 close ();
167 add_reconnect_timer ();
168 }
169}
170
171void zmq::ws_connecter_t::add_connect_timer ()
172{
173 if (options.connect_timeout > 0) {
174 add_timer (options.connect_timeout, connect_timer_id);
175 _connect_timer_started = true;
176 }
177}
178
179int zmq::ws_connecter_t::open ()
180{
181 zmq_assert (_s == retired_fd);
182
183 tcp_address_t tcp_addr;
184 _s = tcp_open_socket (_addr->address.c_str (), options, false, true,
185 &tcp_addr);
186 if (_s == retired_fd)
187 return -1;
188
189 // Set the socket to non-blocking mode so that we get async connect().
190 unblock_socket (_s);
191
192 // Connect to the remote peer.
193#if defined ZMQ_HAVE_VXWORKS
194 int rc = ::connect (_s, (sockaddr *) tcp_addr.addr (), tcp_addr.addrlen ());
195#else
196 int rc = ::connect (_s, tcp_addr.addr (), tcp_addr.addrlen ());
197#endif
198 // Connect was successful immediately.
199 if (rc == 0) {
200 return 0;
201 }
202
203 // Translate error codes indicating asynchronous connect has been
204 // launched to a uniform EINPROGRESS.
205#ifdef ZMQ_HAVE_WINDOWS
206 const int last_error = WSAGetLastError ();
207 if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
208 errno = EINPROGRESS;
209 else
210 errno = wsa_error_to_errno (last_error);
211#else
212 if (errno == EINTR)
213 errno = EINPROGRESS;
214#endif
215 return -1;
216}
217
218zmq::fd_t zmq::ws_connecter_t::connect ()
219{
220 // Async connect has finished. Check whether an error occurred
221 int err = 0;
222#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
223 int len = sizeof err;
224#else
225 socklen_t len = sizeof err;
226#endif
227
228 const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
229 reinterpret_cast<char *> (&err), &len);
230
231 // Assert if the error was caused by 0MQ bug.
232 // Networking problems are OK. No need to assert.
233#ifdef ZMQ_HAVE_WINDOWS
234 zmq_assert (rc == 0);
235 if (err != 0) {
236 if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
237 || err == WSAENOBUFS) {
238 wsa_assert_no (err);
239 }
240 return retired_fd;
241 }
242#else
243 // Following code should handle both Berkeley-derived socket
244 // implementations and Solaris.
245 if (rc == -1)
246 err = errno;
247 if (err != 0) {
248 errno = err;
249#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
250 errno_assert (errno != EBADF && errno != ENOPROTOOPT
251 && errno != ENOTSOCK && errno != ENOBUFS);
252#else
253 errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
254 && errno != ENOBUFS);
255#endif
256 return retired_fd;
257 }
258#endif
259
260 // Return the newly connected socket.
261 const fd_t result = _s;
262 _s = retired_fd;
263 return result;
264}
265
266bool zmq::ws_connecter_t::tune_socket (const fd_t fd_)
267{
268 const int rc =
269 tune_tcp_socket (fd_) | tune_tcp_maxrt (fd_, options.tcp_maxrt);
270 return rc == 0;
271}
272
273void zmq::ws_connecter_t::create_engine (fd_t fd_,
274 const std::string &local_address_)
275{
276 const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint,
277 endpoint_type_connect);
278
279 // Create the engine object for this connection.
280 i_engine *engine = NULL;
281 if (_wss)
282#ifdef ZMQ_HAVE_WSS
283 engine = new (std::nothrow)
284 wss_engine_t (fd_, options, endpoint_pair, *_addr->resolved.ws_addr,
285 true, NULL, _hostname);
286#else
287 assert (false);
288#endif
289 else
290 engine = new (std::nothrow) ws_engine_t (
291 fd_, options, endpoint_pair, *_addr->resolved.ws_addr, true);
292 alloc_assert (engine);
293
294 // Attach the engine to the corresponding session object.
295 send_attach (_session, engine);
296
297 // Shut the connecter down.
298 terminate ();
299
300 _socket->event_connected (endpoint_pair, fd_);
301}
302