| 1 | /* |
| 2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
| 3 | |
| 4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
| 5 | |
| 6 | libzmq is free software; you can redistribute it and/or modify it under |
| 7 | the terms of the GNU Lesser General Public License (LGPL) as published |
| 8 | by the Free Software Foundation; either version 3 of the License, or |
| 9 | (at your option) any later version. |
| 10 | |
| 11 | As a special exception, the Contributors give you permission to link |
| 12 | this library with independent modules to produce an executable, |
| 13 | regardless of the license terms of these independent modules, and to |
| 14 | copy and distribute the resulting executable under terms of your choice, |
| 15 | provided that you also meet, for each linked independent module, the |
| 16 | terms and conditions of the license of that module. An independent |
| 17 | module is a module which is not derived from or based on this library. |
| 18 | If you modify this library, you must extend this exception to your |
| 19 | version of the library. |
| 20 | |
| 21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
| 22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
| 24 | License for more details. |
| 25 | |
| 26 | You should have received a copy of the GNU Lesser General Public License |
| 27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 28 | */ |
| 29 | |
| 30 | #include "precompiled.hpp" |
| 31 | #include "stream_connecter_base.hpp" |
| 32 | #include "session_base.hpp" |
| 33 | #include "address.hpp" |
| 34 | #include "random.hpp" |
| 35 | #include "zmtp_engine.hpp" |
| 36 | #include "raw_engine.hpp" |
| 37 | |
| 38 | #ifndef ZMQ_HAVE_WINDOWS |
| 39 | #include <unistd.h> |
| 40 | #else |
| 41 | #include <winsock2.h> |
| 42 | #endif |
| 43 | |
| 44 | #include <limits> |
| 45 | |
| 46 | zmq::stream_connecter_base_t::stream_connecter_base_t ( |
| 47 | zmq::io_thread_t *io_thread_, |
| 48 | zmq::session_base_t *session_, |
| 49 | const zmq::options_t &options_, |
| 50 | zmq::address_t *addr_, |
| 51 | bool delayed_start_) : |
| 52 | own_t (io_thread_, options_), |
| 53 | io_object_t (io_thread_), |
| 54 | _addr (addr_), |
| 55 | _s (retired_fd), |
| 56 | _handle (static_cast<handle_t> (NULL)), |
| 57 | _socket (session_->get_socket ()), |
| 58 | _session (session_), |
| 59 | _delayed_start (delayed_start_), |
| 60 | _reconnect_timer_started (false), |
| 61 | _current_reconnect_ivl (options.reconnect_ivl) |
| 62 | { |
| 63 | zmq_assert (_addr); |
| 64 | _addr->to_string (_endpoint); |
| 65 | // TODO the return value is unused! what if it fails? if this is impossible |
| 66 | // or does not matter, change such that endpoint in initialized using an |
| 67 | // initializer, and make endpoint const |
| 68 | } |
| 69 | |
| 70 | zmq::stream_connecter_base_t::~stream_connecter_base_t () |
| 71 | { |
| 72 | zmq_assert (!_reconnect_timer_started); |
| 73 | zmq_assert (!_handle); |
| 74 | zmq_assert (_s == retired_fd); |
| 75 | } |
| 76 | |
| 77 | void zmq::stream_connecter_base_t::process_plug () |
| 78 | { |
| 79 | if (_delayed_start) |
| 80 | add_reconnect_timer (); |
| 81 | else |
| 82 | start_connecting (); |
| 83 | } |
| 84 | |
| 85 | void zmq::stream_connecter_base_t::process_term (int linger_) |
| 86 | { |
| 87 | if (_reconnect_timer_started) { |
| 88 | cancel_timer (reconnect_timer_id); |
| 89 | _reconnect_timer_started = false; |
| 90 | } |
| 91 | |
| 92 | if (_handle) { |
| 93 | rm_handle (); |
| 94 | } |
| 95 | |
| 96 | if (_s != retired_fd) |
| 97 | close (); |
| 98 | |
| 99 | own_t::process_term (linger_); |
| 100 | } |
| 101 | |
| 102 | void zmq::stream_connecter_base_t::add_reconnect_timer () |
| 103 | { |
| 104 | if (options.reconnect_ivl != -1) { |
| 105 | const int interval = get_new_reconnect_ivl (); |
| 106 | add_timer (interval, reconnect_timer_id); |
| 107 | _socket->event_connect_retried ( |
| 108 | make_unconnected_connect_endpoint_pair (_endpoint), interval); |
| 109 | _reconnect_timer_started = true; |
| 110 | } |
| 111 | } |
| 112 | |
| 113 | int zmq::stream_connecter_base_t::get_new_reconnect_ivl () |
| 114 | { |
| 115 | // TODO should the random jitter be really based on the configured initial |
| 116 | // reconnect interval options.reconnect_ivl, or better on the |
| 117 | // _current_reconnect_ivl? |
| 118 | |
| 119 | // The new interval is the current interval + random value. |
| 120 | const int random_jitter = generate_random () % options.reconnect_ivl; |
| 121 | const int interval = |
| 122 | _current_reconnect_ivl < std::numeric_limits<int>::max () - random_jitter |
| 123 | ? _current_reconnect_ivl + random_jitter |
| 124 | : std::numeric_limits<int>::max (); |
| 125 | |
| 126 | // Only change the new current reconnect interval if the maximum reconnect |
| 127 | // interval was set and if it's larger than the reconnect interval. |
| 128 | if (options.reconnect_ivl_max > 0 |
| 129 | && options.reconnect_ivl_max > options.reconnect_ivl) { |
| 130 | // Calculate the next interval |
| 131 | _current_reconnect_ivl = |
| 132 | _current_reconnect_ivl < std::numeric_limits<int>::max () / 2 |
| 133 | ? std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max) |
| 134 | : options.reconnect_ivl_max; |
| 135 | } |
| 136 | |
| 137 | return interval; |
| 138 | } |
| 139 | |
| 140 | void zmq::stream_connecter_base_t::rm_handle () |
| 141 | { |
| 142 | rm_fd (_handle); |
| 143 | _handle = static_cast<handle_t> (NULL); |
| 144 | } |
| 145 | |
| 146 | void zmq::stream_connecter_base_t::close () |
| 147 | { |
| 148 | // TODO before, this was an assertion for _s != retired_fd, but this does not match usage of close |
| 149 | if (_s != retired_fd) { |
| 150 | #ifdef ZMQ_HAVE_WINDOWS |
| 151 | const int rc = closesocket (_s); |
| 152 | wsa_assert (rc != SOCKET_ERROR); |
| 153 | #else |
| 154 | const int rc = ::close (_s); |
| 155 | errno_assert (rc == 0); |
| 156 | #endif |
| 157 | _socket->event_closed ( |
| 158 | make_unconnected_connect_endpoint_pair (_endpoint), _s); |
| 159 | _s = retired_fd; |
| 160 | } |
| 161 | } |
| 162 | |
| 163 | void zmq::stream_connecter_base_t::in_event () |
| 164 | { |
| 165 | // We are not polling for incoming data, so we are actually called |
| 166 | // because of error here. However, we can get error on out event as well |
| 167 | // on some platforms, so we'll simply handle both events in the same way. |
| 168 | out_event (); |
| 169 | } |
| 170 | |
| 171 | void zmq::stream_connecter_base_t::create_engine ( |
| 172 | fd_t fd_, const std::string &local_address_) |
| 173 | { |
| 174 | const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint, |
| 175 | endpoint_type_connect); |
| 176 | |
| 177 | // Create the engine object for this connection. |
| 178 | i_engine *engine; |
| 179 | if (options.raw_socket) |
| 180 | engine = new (std::nothrow) raw_engine_t (fd_, options, endpoint_pair); |
| 181 | else |
| 182 | engine = new (std::nothrow) zmtp_engine_t (fd_, options, endpoint_pair); |
| 183 | alloc_assert (engine); |
| 184 | |
| 185 | // Attach the engine to the corresponding session object. |
| 186 | send_attach (_session, engine); |
| 187 | |
| 188 | // Shut the connecter down. |
| 189 | terminate (); |
| 190 | |
| 191 | _socket->event_connected (endpoint_pair, fd_); |
| 192 | } |
| 193 | |
| 194 | void zmq::stream_connecter_base_t::timer_event (int id_) |
| 195 | { |
| 196 | zmq_assert (id_ == reconnect_timer_id); |
| 197 | _reconnect_timer_started = false; |
| 198 | start_connecting (); |
| 199 | } |
| 200 | |