1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | #include "stream_connecter_base.hpp" |
32 | #include "session_base.hpp" |
33 | #include "address.hpp" |
34 | #include "random.hpp" |
35 | #include "zmtp_engine.hpp" |
36 | #include "raw_engine.hpp" |
37 | |
38 | #ifndef ZMQ_HAVE_WINDOWS |
39 | #include <unistd.h> |
40 | #else |
41 | #include <winsock2.h> |
42 | #endif |
43 | |
44 | #include <limits> |
45 | |
46 | zmq::stream_connecter_base_t::stream_connecter_base_t ( |
47 | zmq::io_thread_t *io_thread_, |
48 | zmq::session_base_t *session_, |
49 | const zmq::options_t &options_, |
50 | zmq::address_t *addr_, |
51 | bool delayed_start_) : |
52 | own_t (io_thread_, options_), |
53 | io_object_t (io_thread_), |
54 | _addr (addr_), |
55 | _s (retired_fd), |
56 | _handle (static_cast<handle_t> (NULL)), |
57 | _socket (session_->get_socket ()), |
58 | _session (session_), |
59 | _delayed_start (delayed_start_), |
60 | _reconnect_timer_started (false), |
61 | _current_reconnect_ivl (options.reconnect_ivl) |
62 | { |
63 | zmq_assert (_addr); |
64 | _addr->to_string (_endpoint); |
65 | // TODO the return value is unused! what if it fails? if this is impossible |
66 | // or does not matter, change such that endpoint in initialized using an |
67 | // initializer, and make endpoint const |
68 | } |
69 | |
70 | zmq::stream_connecter_base_t::~stream_connecter_base_t () |
71 | { |
72 | zmq_assert (!_reconnect_timer_started); |
73 | zmq_assert (!_handle); |
74 | zmq_assert (_s == retired_fd); |
75 | } |
76 | |
77 | void zmq::stream_connecter_base_t::process_plug () |
78 | { |
79 | if (_delayed_start) |
80 | add_reconnect_timer (); |
81 | else |
82 | start_connecting (); |
83 | } |
84 | |
85 | void zmq::stream_connecter_base_t::process_term (int linger_) |
86 | { |
87 | if (_reconnect_timer_started) { |
88 | cancel_timer (reconnect_timer_id); |
89 | _reconnect_timer_started = false; |
90 | } |
91 | |
92 | if (_handle) { |
93 | rm_handle (); |
94 | } |
95 | |
96 | if (_s != retired_fd) |
97 | close (); |
98 | |
99 | own_t::process_term (linger_); |
100 | } |
101 | |
102 | void zmq::stream_connecter_base_t::add_reconnect_timer () |
103 | { |
104 | if (options.reconnect_ivl != -1) { |
105 | const int interval = get_new_reconnect_ivl (); |
106 | add_timer (interval, reconnect_timer_id); |
107 | _socket->event_connect_retried ( |
108 | make_unconnected_connect_endpoint_pair (_endpoint), interval); |
109 | _reconnect_timer_started = true; |
110 | } |
111 | } |
112 | |
113 | int zmq::stream_connecter_base_t::get_new_reconnect_ivl () |
114 | { |
115 | // TODO should the random jitter be really based on the configured initial |
116 | // reconnect interval options.reconnect_ivl, or better on the |
117 | // _current_reconnect_ivl? |
118 | |
119 | // The new interval is the current interval + random value. |
120 | const int random_jitter = generate_random () % options.reconnect_ivl; |
121 | const int interval = |
122 | _current_reconnect_ivl < std::numeric_limits<int>::max () - random_jitter |
123 | ? _current_reconnect_ivl + random_jitter |
124 | : std::numeric_limits<int>::max (); |
125 | |
126 | // Only change the new current reconnect interval if the maximum reconnect |
127 | // interval was set and if it's larger than the reconnect interval. |
128 | if (options.reconnect_ivl_max > 0 |
129 | && options.reconnect_ivl_max > options.reconnect_ivl) { |
130 | // Calculate the next interval |
131 | _current_reconnect_ivl = |
132 | _current_reconnect_ivl < std::numeric_limits<int>::max () / 2 |
133 | ? std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max) |
134 | : options.reconnect_ivl_max; |
135 | } |
136 | |
137 | return interval; |
138 | } |
139 | |
140 | void zmq::stream_connecter_base_t::rm_handle () |
141 | { |
142 | rm_fd (_handle); |
143 | _handle = static_cast<handle_t> (NULL); |
144 | } |
145 | |
146 | void zmq::stream_connecter_base_t::close () |
147 | { |
148 | // TODO before, this was an assertion for _s != retired_fd, but this does not match usage of close |
149 | if (_s != retired_fd) { |
150 | #ifdef ZMQ_HAVE_WINDOWS |
151 | const int rc = closesocket (_s); |
152 | wsa_assert (rc != SOCKET_ERROR); |
153 | #else |
154 | const int rc = ::close (_s); |
155 | errno_assert (rc == 0); |
156 | #endif |
157 | _socket->event_closed ( |
158 | make_unconnected_connect_endpoint_pair (_endpoint), _s); |
159 | _s = retired_fd; |
160 | } |
161 | } |
162 | |
163 | void zmq::stream_connecter_base_t::in_event () |
164 | { |
165 | // We are not polling for incoming data, so we are actually called |
166 | // because of error here. However, we can get error on out event as well |
167 | // on some platforms, so we'll simply handle both events in the same way. |
168 | out_event (); |
169 | } |
170 | |
171 | void zmq::stream_connecter_base_t::create_engine ( |
172 | fd_t fd_, const std::string &local_address_) |
173 | { |
174 | const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint, |
175 | endpoint_type_connect); |
176 | |
177 | // Create the engine object for this connection. |
178 | i_engine *engine; |
179 | if (options.raw_socket) |
180 | engine = new (std::nothrow) raw_engine_t (fd_, options, endpoint_pair); |
181 | else |
182 | engine = new (std::nothrow) zmtp_engine_t (fd_, options, endpoint_pair); |
183 | alloc_assert (engine); |
184 | |
185 | // Attach the engine to the corresponding session object. |
186 | send_attach (_session, engine); |
187 | |
188 | // Shut the connecter down. |
189 | terminate (); |
190 | |
191 | _socket->event_connected (endpoint_pair, fd_); |
192 | } |
193 | |
194 | void zmq::stream_connecter_base_t::timer_event (int id_) |
195 | { |
196 | zmq_assert (id_ == reconnect_timer_id); |
197 | _reconnect_timer_started = false; |
198 | start_connecting (); |
199 | } |
200 | |