1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #ifndef __ZMQ_STREAM_ENGINE_BASE_HPP_INCLUDED__ |
31 | #define __ZMQ_STREAM_ENGINE_BASE_HPP_INCLUDED__ |
32 | |
33 | #include <stddef.h> |
34 | |
35 | #include "fd.hpp" |
36 | #include "i_engine.hpp" |
37 | #include "io_object.hpp" |
38 | #include "i_encoder.hpp" |
39 | #include "i_decoder.hpp" |
40 | #include "options.hpp" |
41 | #include "socket_base.hpp" |
42 | #include "metadata.hpp" |
43 | #include "msg.hpp" |
44 | #include "tcp.hpp" |
45 | |
46 | namespace zmq |
47 | { |
48 | class io_thread_t; |
49 | class session_base_t; |
50 | class mechanism_t; |
51 | |
52 | // This engine handles any socket with SOCK_STREAM semantics, |
53 | // e.g. TCP socket or an UNIX domain socket. |
54 | |
55 | class stream_engine_base_t : public io_object_t, public i_engine |
56 | { |
57 | public: |
58 | stream_engine_base_t (fd_t fd_, |
59 | const options_t &options_, |
60 | const endpoint_uri_pair_t &endpoint_uri_pair_); |
61 | ~stream_engine_base_t (); |
62 | |
63 | // i_engine interface implementation. |
64 | void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); |
65 | void terminate (); |
66 | bool restart_input (); |
67 | void restart_output (); |
68 | void zap_msg_available (); |
69 | const endpoint_uri_pair_t &get_endpoint () const; |
70 | |
71 | // i_poll_events interface implementation. |
72 | void in_event (); |
73 | void out_event (); |
74 | void timer_event (int id_); |
75 | |
76 | protected: |
77 | typedef metadata_t::dict_t properties_t; |
78 | bool init_properties (properties_t &properties_); |
79 | |
80 | // Function to handle network disconnections. |
81 | virtual void error (error_reason_t reason_); |
82 | |
83 | int next_handshake_command (msg_t *msg_); |
84 | int process_handshake_command (msg_t *msg_); |
85 | |
86 | int pull_msg_from_session (msg_t *msg_); |
87 | int push_msg_to_session (msg_t *msg_); |
88 | |
89 | int pull_and_encode (msg_t *msg_); |
90 | int decode_and_push (msg_t *msg_); |
91 | |
92 | void set_handshake_timer (); |
93 | |
94 | virtual bool handshake () { return true; }; |
95 | virtual void plug_internal (){}; |
96 | |
97 | virtual int process_command_message (msg_t *msg_) { return -1; }; |
98 | virtual int produce_ping_message (msg_t *msg_) { return -1; }; |
99 | virtual int process_heartbeat_message (msg_t *msg_) { return -1; }; |
100 | virtual int produce_pong_message (msg_t *msg_) { return -1; }; |
101 | |
102 | virtual int read (void *data, size_t size_); |
103 | virtual int write (const void *data_, size_t size_); |
104 | |
105 | void reset_pollout () { io_object_t::reset_pollout (_handle); } |
106 | void set_pollout () { io_object_t::set_pollout (_handle); } |
107 | void set_pollin () { io_object_t::set_pollin (_handle); } |
108 | session_base_t *session () { return _session; } |
109 | socket_base_t *socket () { return _socket; } |
110 | |
111 | const options_t _options; |
112 | |
113 | unsigned char *_inpos; |
114 | size_t _insize; |
115 | i_decoder *_decoder; |
116 | |
117 | unsigned char *_outpos; |
118 | size_t _outsize; |
119 | i_encoder *_encoder; |
120 | |
121 | mechanism_t *_mechanism; |
122 | |
123 | int (stream_engine_base_t::*_next_msg) (msg_t *msg_); |
124 | int (stream_engine_base_t::*_process_msg) (msg_t *msg_); |
125 | |
126 | // Metadata to be attached to received messages. May be NULL. |
127 | metadata_t *_metadata; |
128 | |
129 | // True iff the engine couldn't consume the last decoded message. |
130 | bool _input_stopped; |
131 | |
132 | // True iff the engine doesn't have any message to encode. |
133 | bool _output_stopped; |
134 | |
135 | // Representation of the connected endpoints. |
136 | const endpoint_uri_pair_t _endpoint_uri_pair; |
137 | |
138 | // ID of the handshake timer |
139 | enum |
140 | { |
141 | handshake_timer_id = 0x40 |
142 | }; |
143 | |
144 | // True is linger timer is running. |
145 | bool _has_handshake_timer; |
146 | |
147 | // Heartbeat stuff |
148 | enum |
149 | { |
150 | heartbeat_ivl_timer_id = 0x80, |
151 | heartbeat_timeout_timer_id = 0x81, |
152 | heartbeat_ttl_timer_id = 0x82 |
153 | }; |
154 | bool _has_ttl_timer; |
155 | bool _has_timeout_timer; |
156 | bool _has_heartbeat_timer; |
157 | |
158 | |
159 | const std::string _peer_address; |
160 | |
161 | private: |
162 | bool in_event_internal (); |
163 | |
164 | // Unplug the engine from the session. |
165 | void unplug (); |
166 | |
167 | int write_credential (msg_t *msg_); |
168 | int push_one_then_decode_and_push (msg_t *msg_); |
169 | |
170 | void mechanism_ready (); |
171 | |
172 | // Underlying socket. |
173 | fd_t _s; |
174 | |
175 | handle_t _handle; |
176 | |
177 | bool _plugged; |
178 | |
179 | // When true, we are still trying to determine whether |
180 | // the peer is using versioned protocol, and if so, which |
181 | // version. When false, normal message flow has started. |
182 | bool _handshaking; |
183 | |
184 | msg_t _tx_msg; |
185 | |
186 | bool _io_error; |
187 | |
188 | // The session this engine is attached to. |
189 | zmq::session_base_t *_session; |
190 | |
191 | // Socket |
192 | zmq::socket_base_t *_socket; |
193 | |
194 | ZMQ_NON_COPYABLE_NOR_MOVABLE (stream_engine_base_t) |
195 | }; |
196 | } |
197 | |
198 | #endif |
199 | |