1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include <new>
32#include <string>
33
34#include "macros.hpp"
35#include "socks_connecter.hpp"
36#include "random.hpp"
37#include "err.hpp"
38#include "ip.hpp"
39#include "tcp.hpp"
40#include "address.hpp"
41#include "tcp_address.hpp"
42#include "session_base.hpp"
43#include "socks.hpp"
44
45#ifndef ZMQ_HAVE_WINDOWS
46#include <unistd.h>
47#include <sys/types.h>
48#include <sys/socket.h>
49#if defined ZMQ_HAVE_VXWORKS
50#include <sockLib.h>
51#endif
52#endif
53
54zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
55 class session_base_t *session_,
56 const options_t &options_,
57 address_t *addr_,
58 address_t *proxy_addr_,
59 bool delayed_start_) :
60 stream_connecter_base_t (
61 io_thread_, session_, options_, addr_, delayed_start_),
62 _proxy_addr (proxy_addr_),
63 _auth_method (socks_no_auth_required),
64 _status (unplugged)
65{
66 zmq_assert (_addr->protocol == protocol_name::tcp);
67 _proxy_addr->to_string (_endpoint);
68}
69
70zmq::socks_connecter_t::~socks_connecter_t ()
71{
72 LIBZMQ_DELETE (_proxy_addr);
73}
74
75void zmq::socks_connecter_t::set_auth_method_none ()
76{
77 _auth_method = socks_no_auth_required;
78 _auth_username.clear ();
79 _auth_password.clear ();
80}
81
82void zmq::socks_connecter_t::set_auth_method_basic (
83 const std::string &username_, const std::string &password_)
84{
85 _auth_method = socks_basic_auth;
86 _auth_username = username_;
87 _auth_password = password_;
88}
89
90void zmq::socks_connecter_t::in_event ()
91{
92 int expected_status = -1;
93 zmq_assert (_status != unplugged);
94
95 if (_status == waiting_for_choice) {
96 int rc = _choice_decoder.input (_s);
97 if (rc == 0 || rc == -1)
98 error ();
99 else if (_choice_decoder.message_ready ()) {
100 const socks_choice_t choice = _choice_decoder.decode ();
101 rc = process_server_response (choice);
102 if (rc == -1)
103 error ();
104 else {
105 if (choice.method == socks_basic_auth)
106 expected_status = sending_basic_auth_request;
107 else
108 expected_status = sending_request;
109 }
110 }
111 } else if (_status == waiting_for_auth_response) {
112 int rc = _auth_response_decoder.input (_s);
113 if (rc == 0 || rc == -1)
114 error ();
115 else if (_auth_response_decoder.message_ready ()) {
116 const socks_auth_response_t auth_response =
117 _auth_response_decoder.decode ();
118 rc = process_server_response (auth_response);
119 if (rc == -1)
120 error ();
121 else {
122 expected_status = sending_request;
123 }
124 }
125 } else if (_status == waiting_for_response) {
126 int rc = _response_decoder.input (_s);
127 if (rc == 0 || rc == -1)
128 error ();
129 else if (_response_decoder.message_ready ()) {
130 const socks_response_t response = _response_decoder.decode ();
131 rc = process_server_response (response);
132 if (rc == -1)
133 error ();
134 else {
135 rm_handle ();
136 create_engine (
137 _s, get_socket_name<tcp_address_t> (_s, socket_end_local));
138 _s = -1;
139 _status = unplugged;
140 }
141 }
142 } else
143 error ();
144
145 if (expected_status == sending_basic_auth_request) {
146 _basic_auth_request_encoder.encode (
147 socks_basic_auth_request_t (_auth_username, _auth_password));
148 reset_pollin (_handle);
149 set_pollout (_handle);
150 _status = sending_basic_auth_request;
151 } else if (expected_status == sending_request) {
152 std::string hostname;
153 uint16_t port = 0;
154 if (parse_address (_addr->address, hostname, port) == -1)
155 error ();
156 else {
157 _request_encoder.encode (socks_request_t (1, hostname, port));
158 reset_pollin (_handle);
159 set_pollout (_handle);
160 _status = sending_request;
161 }
162 }
163}
164
165void zmq::socks_connecter_t::out_event ()
166{
167 zmq_assert (
168 _status == waiting_for_proxy_connection || _status == sending_greeting
169 || _status == sending_basic_auth_request || _status == sending_request);
170
171 if (_status == waiting_for_proxy_connection) {
172 const int rc = static_cast<int> (check_proxy_connection ());
173 if (rc == -1)
174 error ();
175 else {
176 _greeting_encoder.encode (socks_greeting_t (_auth_method));
177 _status = sending_greeting;
178 }
179 } else if (_status == sending_greeting) {
180 zmq_assert (_greeting_encoder.has_pending_data ());
181 const int rc = _greeting_encoder.output (_s);
182 if (rc == -1 || rc == 0)
183 error ();
184 else if (!_greeting_encoder.has_pending_data ()) {
185 reset_pollout (_handle);
186 set_pollin (_handle);
187 _status = waiting_for_choice;
188 }
189 } else if (_status == sending_basic_auth_request) {
190 zmq_assert (_basic_auth_request_encoder.has_pending_data ());
191 const int rc = _basic_auth_request_encoder.output (_s);
192 if (rc == -1 || rc == 0)
193 error ();
194 else if (!_basic_auth_request_encoder.has_pending_data ()) {
195 reset_pollout (_handle);
196 set_pollin (_handle);
197 _status = waiting_for_auth_response;
198 }
199 } else {
200 zmq_assert (_request_encoder.has_pending_data ());
201 const int rc = _request_encoder.output (_s);
202 if (rc == -1 || rc == 0)
203 error ();
204 else if (!_request_encoder.has_pending_data ()) {
205 reset_pollout (_handle);
206 set_pollin (_handle);
207 _status = waiting_for_response;
208 }
209 }
210}
211
212void zmq::socks_connecter_t::start_connecting ()
213{
214 zmq_assert (_status == unplugged);
215
216 // Open the connecting socket.
217 const int rc = connect_to_proxy ();
218
219 // Connect may succeed in synchronous manner.
220 if (rc == 0) {
221 _handle = add_fd (_s);
222 set_pollout (_handle);
223 _status = sending_greeting;
224 }
225 // Connection establishment may be delayed. Poll for its completion.
226 else if (errno == EINPROGRESS) {
227 _handle = add_fd (_s);
228 set_pollout (_handle);
229 _status = waiting_for_proxy_connection;
230 _socket->event_connect_delayed (
231 make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
232 }
233 // Handle any other error condition by eventual reconnect.
234 else {
235 if (_s != retired_fd)
236 close ();
237 add_reconnect_timer ();
238 }
239}
240
241int zmq::socks_connecter_t::process_server_response (
242 const socks_choice_t &response_)
243{
244 return response_.method == socks_no_auth_required
245 || response_.method == socks_basic_auth
246 ? 0
247 : -1;
248}
249
250int zmq::socks_connecter_t::process_server_response (
251 const socks_response_t &response_)
252{
253 return response_.response_code == 0 ? 0 : -1;
254}
255
256int zmq::socks_connecter_t::process_server_response (
257 const socks_auth_response_t &response_)
258{
259 return response_.response_code == 0 ? 0 : -1;
260}
261
262void zmq::socks_connecter_t::error ()
263{
264 rm_fd (_handle);
265 close ();
266 _greeting_encoder.reset ();
267 _choice_decoder.reset ();
268 _basic_auth_request_encoder.reset ();
269 _auth_response_decoder.reset ();
270 _request_encoder.reset ();
271 _response_decoder.reset ();
272 _status = unplugged;
273 add_reconnect_timer ();
274}
275
276int zmq::socks_connecter_t::connect_to_proxy ()
277{
278 zmq_assert (_s == retired_fd);
279
280 // Resolve the address
281 if (_proxy_addr->resolved.tcp_addr != NULL) {
282 LIBZMQ_DELETE (_proxy_addr->resolved.tcp_addr);
283 }
284
285 _proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
286 alloc_assert (_proxy_addr->resolved.tcp_addr);
287 // Automatic fallback to ipv4 is disabled here since this was the existing
288 // behaviour, however I don't see a real reason for this. Maybe this can
289 // be changed to true (and then the parameter can be removed entirely).
290 _s = tcp_open_socket (_proxy_addr->address.c_str (), options, false, false,
291 _proxy_addr->resolved.tcp_addr);
292 if (_s == retired_fd) {
293 // TODO we should emit some event in this case!
294 LIBZMQ_DELETE (_proxy_addr->resolved.tcp_addr);
295 return -1;
296 }
297 zmq_assert (_proxy_addr->resolved.tcp_addr != NULL);
298
299 // Set the socket to non-blocking mode so that we get async connect().
300 unblock_socket (_s);
301
302 const tcp_address_t *const tcp_addr = _proxy_addr->resolved.tcp_addr;
303
304 int rc;
305
306 // Set a source address for conversations
307 if (tcp_addr->has_src_addr ()) {
308#if defined ZMQ_HAVE_VXWORKS
309 rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (),
310 tcp_addr->src_addrlen ());
311#else
312 rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
313#endif
314 if (rc == -1) {
315 close ();
316 return -1;
317 }
318 }
319
320 // Connect to the remote peer.
321#if defined ZMQ_HAVE_VXWORKS
322 rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ());
323#else
324 rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ());
325#endif
326 // Connect was successful immediately.
327 if (rc == 0)
328 return 0;
329
330 // Translate error codes indicating asynchronous connect has been
331 // launched to a uniform EINPROGRESS.
332#ifdef ZMQ_HAVE_WINDOWS
333 const int last_error = WSAGetLastError ();
334 if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
335 errno = EINPROGRESS;
336 else {
337 errno = wsa_error_to_errno (last_error);
338 close ();
339 }
340#else
341 if (errno == EINTR)
342 errno = EINPROGRESS;
343#endif
344 return -1;
345}
346
347zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
348{
349 // Async connect has finished. Check whether an error occurred
350 int err = 0;
351#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
352 int len = sizeof err;
353#else
354 socklen_t len = sizeof err;
355#endif
356
357 int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
358 reinterpret_cast<char *> (&err), &len);
359
360 // Assert if the error was caused by 0MQ bug.
361 // Networking problems are OK. No need to assert.
362#ifdef ZMQ_HAVE_WINDOWS
363 zmq_assert (rc == 0);
364 if (err != 0) {
365 wsa_assert (err == WSAECONNREFUSED || err == WSAETIMEDOUT
366 || err == WSAECONNABORTED || err == WSAEHOSTUNREACH
367 || err == WSAENETUNREACH || err == WSAENETDOWN
368 || err == WSAEACCES || err == WSAEINVAL
369 || err == WSAEADDRINUSE);
370 return -1;
371 }
372#else
373 // Following code should handle both Berkeley-derived socket
374 // implementations and Solaris.
375 if (rc == -1)
376 err = errno;
377 if (err != 0) {
378 errno = err;
379 errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
380 || errno == ETIMEDOUT || errno == EHOSTUNREACH
381 || errno == ENETUNREACH || errno == ENETDOWN
382 || errno == EINVAL);
383 return -1;
384 }
385#endif
386
387 rc = tune_tcp_socket (_s);
388 rc = rc
389 | tune_tcp_keepalives (
390 _s, options.tcp_keepalive, options.tcp_keepalive_cnt,
391 options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
392 if (rc != 0)
393 return -1;
394
395 return 0;
396}
397
398int zmq::socks_connecter_t::parse_address (const std::string &address_,
399 std::string &hostname_,
400 uint16_t &port_)
401{
402 // Find the ':' at end that separates address from the port number.
403 const size_t idx = address_.rfind (':');
404 if (idx == std::string::npos) {
405 errno = EINVAL;
406 return -1;
407 }
408
409 // Extract hostname
410 if (idx < 2 || address_[0] != '[' || address_[idx - 1] != ']')
411 hostname_ = address_.substr (0, idx);
412 else
413 hostname_ = address_.substr (1, idx - 2);
414
415 // Separate the hostname/port.
416 const std::string port_str = address_.substr (idx + 1);
417 // Parse the port number (0 is not a valid port).
418 port_ = static_cast<uint16_t> (atoi (port_str.c_str ()));
419 if (port_ == 0) {
420 errno = EINVAL;
421 return -1;
422 }
423 return 0;
424}
425