1 | /* |
2 | Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | #include <new> |
32 | #include <string> |
33 | |
34 | #include "macros.hpp" |
35 | #include "ws_connecter.hpp" |
36 | #include "io_thread.hpp" |
37 | #include "err.hpp" |
38 | #include "ip.hpp" |
39 | #include "tcp.hpp" |
40 | #include "address.hpp" |
41 | #include "ws_address.hpp" |
42 | #include "wss_address.hpp" |
43 | #include "session_base.hpp" |
44 | #include "ws_engine.hpp" |
45 | |
46 | #ifdef ZMQ_HAVE_WSS |
47 | #include "wss_engine.hpp" |
48 | #endif |
49 | |
50 | #if !defined ZMQ_HAVE_WINDOWS |
51 | #include <unistd.h> |
52 | #include <sys/types.h> |
53 | #include <sys/socket.h> |
54 | #include <arpa/inet.h> |
55 | #include <netinet/tcp.h> |
56 | #include <netinet/in.h> |
57 | #include <netdb.h> |
58 | #include <fcntl.h> |
59 | #ifdef ZMQ_HAVE_VXWORKS |
60 | #include <sockLib.h> |
61 | #endif |
62 | #ifdef ZMQ_HAVE_OPENVMS |
63 | #include <ioctl.h> |
64 | #endif |
65 | #endif |
66 | |
67 | #ifdef __APPLE__ |
68 | #include <TargetConditionals.h> |
69 | #endif |
70 | |
71 | zmq::ws_connecter_t::ws_connecter_t (class io_thread_t *io_thread_, |
72 | class session_base_t *session_, |
73 | const options_t &options_, |
74 | address_t *addr_, |
75 | bool delayed_start_, |
76 | bool wss_, |
77 | const char *tls_hostname_) : |
78 | stream_connecter_base_t ( |
79 | io_thread_, session_, options_, addr_, delayed_start_), |
80 | _connect_timer_started (false), |
81 | _wss (wss_), |
82 | _hostname (tls_hostname_) |
83 | { |
84 | } |
85 | |
86 | zmq::ws_connecter_t::~ws_connecter_t () |
87 | { |
88 | zmq_assert (!_connect_timer_started); |
89 | } |
90 | |
91 | void zmq::ws_connecter_t::process_term (int linger_) |
92 | { |
93 | if (_connect_timer_started) { |
94 | cancel_timer (connect_timer_id); |
95 | _connect_timer_started = false; |
96 | } |
97 | |
98 | stream_connecter_base_t::process_term (linger_); |
99 | } |
100 | |
101 | void zmq::ws_connecter_t::out_event () |
102 | { |
103 | if (_connect_timer_started) { |
104 | cancel_timer (connect_timer_id); |
105 | _connect_timer_started = false; |
106 | } |
107 | |
108 | // TODO this is still very similar to (t)ipc_connecter_t, maybe the |
109 | // differences can be factored out |
110 | |
111 | rm_handle (); |
112 | |
113 | const fd_t fd = connect (); |
114 | |
115 | // Handle the error condition by attempt to reconnect. |
116 | if (fd == retired_fd || !tune_socket (fd)) { |
117 | close (); |
118 | add_reconnect_timer (); |
119 | return; |
120 | } |
121 | |
122 | if (_wss) |
123 | create_engine (fd, |
124 | get_socket_name<wss_address_t> (fd, socket_end_local)); |
125 | else |
126 | create_engine (fd, |
127 | get_socket_name<ws_address_t> (fd, socket_end_local)); |
128 | } |
129 | |
130 | void zmq::ws_connecter_t::timer_event (int id_) |
131 | { |
132 | if (id_ == connect_timer_id) { |
133 | _connect_timer_started = false; |
134 | rm_handle (); |
135 | close (); |
136 | add_reconnect_timer (); |
137 | } else |
138 | stream_connecter_base_t::timer_event (id_); |
139 | } |
140 | |
141 | void zmq::ws_connecter_t::start_connecting () |
142 | { |
143 | // Open the connecting socket. |
144 | const int rc = open (); |
145 | |
146 | // Connect may succeed in synchronous manner. |
147 | if (rc == 0) { |
148 | _handle = add_fd (_s); |
149 | out_event (); |
150 | } |
151 | |
152 | // Connection establishment may be delayed. Poll for its completion. |
153 | else if (rc == -1 && errno == EINPROGRESS) { |
154 | _handle = add_fd (_s); |
155 | set_pollout (_handle); |
156 | _socket->event_connect_delayed ( |
157 | make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ()); |
158 | |
159 | // add userspace connect timeout |
160 | add_connect_timer (); |
161 | } |
162 | |
163 | // Handle any other error condition by eventual reconnect. |
164 | else { |
165 | if (_s != retired_fd) |
166 | close (); |
167 | add_reconnect_timer (); |
168 | } |
169 | } |
170 | |
171 | void zmq::ws_connecter_t::add_connect_timer () |
172 | { |
173 | if (options.connect_timeout > 0) { |
174 | add_timer (options.connect_timeout, connect_timer_id); |
175 | _connect_timer_started = true; |
176 | } |
177 | } |
178 | |
179 | int zmq::ws_connecter_t::open () |
180 | { |
181 | zmq_assert (_s == retired_fd); |
182 | |
183 | tcp_address_t tcp_addr; |
184 | _s = tcp_open_socket (_addr->address.c_str (), options, false, true, |
185 | &tcp_addr); |
186 | if (_s == retired_fd) |
187 | return -1; |
188 | |
189 | // Set the socket to non-blocking mode so that we get async connect(). |
190 | unblock_socket (_s); |
191 | |
192 | // Connect to the remote peer. |
193 | #if defined ZMQ_HAVE_VXWORKS |
194 | int rc = ::connect (_s, (sockaddr *) tcp_addr.addr (), tcp_addr.addrlen ()); |
195 | #else |
196 | int rc = ::connect (_s, tcp_addr.addr (), tcp_addr.addrlen ()); |
197 | #endif |
198 | // Connect was successful immediately. |
199 | if (rc == 0) { |
200 | return 0; |
201 | } |
202 | |
203 | // Translate error codes indicating asynchronous connect has been |
204 | // launched to a uniform EINPROGRESS. |
205 | #ifdef ZMQ_HAVE_WINDOWS |
206 | const int last_error = WSAGetLastError (); |
207 | if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK) |
208 | errno = EINPROGRESS; |
209 | else |
210 | errno = wsa_error_to_errno (last_error); |
211 | #else |
212 | if (errno == EINTR) |
213 | errno = EINPROGRESS; |
214 | #endif |
215 | return -1; |
216 | } |
217 | |
218 | zmq::fd_t zmq::ws_connecter_t::connect () |
219 | { |
220 | // Async connect has finished. Check whether an error occurred |
221 | int err = 0; |
222 | #if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS |
223 | int len = sizeof err; |
224 | #else |
225 | socklen_t len = sizeof err; |
226 | #endif |
227 | |
228 | const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR, |
229 | reinterpret_cast<char *> (&err), &len); |
230 | |
231 | // Assert if the error was caused by 0MQ bug. |
232 | // Networking problems are OK. No need to assert. |
233 | #ifdef ZMQ_HAVE_WINDOWS |
234 | zmq_assert (rc == 0); |
235 | if (err != 0) { |
236 | if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK |
237 | || err == WSAENOBUFS) { |
238 | wsa_assert_no (err); |
239 | } |
240 | return retired_fd; |
241 | } |
242 | #else |
243 | // Following code should handle both Berkeley-derived socket |
244 | // implementations and Solaris. |
245 | if (rc == -1) |
246 | err = errno; |
247 | if (err != 0) { |
248 | errno = err; |
249 | #if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE |
250 | errno_assert (errno != EBADF && errno != ENOPROTOOPT |
251 | && errno != ENOTSOCK && errno != ENOBUFS); |
252 | #else |
253 | errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK |
254 | && errno != ENOBUFS); |
255 | #endif |
256 | return retired_fd; |
257 | } |
258 | #endif |
259 | |
260 | // Return the newly connected socket. |
261 | const fd_t result = _s; |
262 | _s = retired_fd; |
263 | return result; |
264 | } |
265 | |
266 | bool zmq::ws_connecter_t::tune_socket (const fd_t fd_) |
267 | { |
268 | const int rc = |
269 | tune_tcp_socket (fd_) | tune_tcp_maxrt (fd_, options.tcp_maxrt); |
270 | return rc == 0; |
271 | } |
272 | |
273 | void zmq::ws_connecter_t::create_engine (fd_t fd_, |
274 | const std::string &local_address_) |
275 | { |
276 | const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint, |
277 | endpoint_type_connect); |
278 | |
279 | // Create the engine object for this connection. |
280 | i_engine *engine = NULL; |
281 | if (_wss) |
282 | #ifdef ZMQ_HAVE_WSS |
283 | engine = new (std::nothrow) |
284 | wss_engine_t (fd_, options, endpoint_pair, *_addr->resolved.ws_addr, |
285 | true, NULL, _hostname); |
286 | #else |
287 | assert (false); |
288 | #endif |
289 | else |
290 | engine = new (std::nothrow) ws_engine_t ( |
291 | fd_, options, endpoint_pair, *_addr->resolved.ws_addr, true); |
292 | alloc_assert (engine); |
293 | |
294 | // Attach the engine to the corresponding session object. |
295 | send_attach (_session, engine); |
296 | |
297 | // Shut the connecter down. |
298 | terminate (); |
299 | |
300 | _socket->event_connected (endpoint_pair, fd_); |
301 | } |
302 | |