1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__ |
31 | #define __ZMQ_SOCKET_BASE_HPP_INCLUDED__ |
32 | |
33 | #include <string> |
34 | #include <map> |
35 | #include <stdarg.h> |
36 | |
37 | #include "own.hpp" |
38 | #include "array.hpp" |
39 | #include "blob.hpp" |
40 | #include "stdint.hpp" |
41 | #include "poller.hpp" |
42 | #include "i_poll_events.hpp" |
43 | #include "i_mailbox.hpp" |
44 | #include "clock.hpp" |
45 | #include "pipe.hpp" |
46 | #include "endpoint.hpp" |
47 | |
48 | extern "C" { |
49 | void zmq_free_event (void *data_, void *hint_); |
50 | } |
51 | |
52 | namespace zmq |
53 | { |
54 | class ctx_t; |
55 | class msg_t; |
56 | class pipe_t; |
57 | |
58 | class socket_base_t : public own_t, |
59 | public array_item_t<>, |
60 | public i_poll_events, |
61 | public i_pipe_events |
62 | { |
63 | friend class reaper_t; |
64 | |
65 | public: |
66 | // Returns false if object is not a socket. |
67 | bool check_tag () const; |
68 | |
69 | // Returns whether the socket is thread-safe. |
70 | bool is_thread_safe () const; |
71 | |
72 | // Create a socket of a specified type. |
73 | static socket_base_t * |
74 | create (int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_); |
75 | |
76 | // Returns the mailbox associated with this socket. |
77 | i_mailbox *get_mailbox () const; |
78 | |
79 | // Interrupt blocking call if the socket is stuck in one. |
80 | // This function can be called from a different thread! |
81 | void stop (); |
82 | |
83 | // Interface for communication with the API layer. |
84 | int setsockopt (int option_, const void *optval_, size_t optvallen_); |
85 | int getsockopt (int option_, void *optval_, size_t *optvallen_); |
86 | int bind (const char *endpoint_uri_); |
87 | int connect (const char *endpoint_uri_); |
88 | int term_endpoint (const char *endpoint_uri_); |
89 | int send (zmq::msg_t *msg_, int flags_); |
90 | int recv (zmq::msg_t *msg_, int flags_); |
91 | void add_signaler (signaler_t *s_); |
92 | void remove_signaler (signaler_t *s_); |
93 | int close (); |
94 | |
95 | // These functions are used by the polling mechanism to determine |
96 | // which events are to be reported from this socket. |
97 | bool has_in (); |
98 | bool has_out (); |
99 | |
100 | // Joining and leaving groups |
101 | int join (const char *group_); |
102 | int leave (const char *group_); |
103 | |
104 | // Using this function reaper thread ask the socket to register with |
105 | // its poller. |
106 | void start_reaping (poller_t *poller_); |
107 | |
108 | // i_poll_events implementation. This interface is used when socket |
109 | // is handled by the poller in the reaper thread. |
110 | void in_event (); |
111 | void out_event (); |
112 | void timer_event (int id_); |
113 | |
114 | // i_pipe_events interface implementation. |
115 | void read_activated (pipe_t *pipe_); |
116 | void write_activated (pipe_t *pipe_); |
117 | void hiccuped (pipe_t *pipe_); |
118 | void pipe_terminated (pipe_t *pipe_); |
119 | void lock (); |
120 | void unlock (); |
121 | |
122 | int monitor (const char *endpoint_, |
123 | uint64_t events_, |
124 | int event_version_, |
125 | int type_); |
126 | |
127 | void event_connected (const endpoint_uri_pair_t &endpoint_uri_pair_, |
128 | zmq::fd_t fd_); |
129 | void event_connect_delayed (const endpoint_uri_pair_t &endpoint_uri_pair_, |
130 | int err_); |
131 | void event_connect_retried (const endpoint_uri_pair_t &endpoint_uri_pair_, |
132 | int interval_); |
133 | void event_listening (const endpoint_uri_pair_t &endpoint_uri_pair_, |
134 | zmq::fd_t fd_); |
135 | void event_bind_failed (const endpoint_uri_pair_t &endpoint_uri_pair_, |
136 | int err_); |
137 | void event_accepted (const endpoint_uri_pair_t &endpoint_uri_pair_, |
138 | zmq::fd_t fd_); |
139 | void event_accept_failed (const endpoint_uri_pair_t &endpoint_uri_pair_, |
140 | int err_); |
141 | void event_closed (const endpoint_uri_pair_t &endpoint_uri_pair_, |
142 | zmq::fd_t fd_); |
143 | void event_close_failed (const endpoint_uri_pair_t &endpoint_uri_pair_, |
144 | int err_); |
145 | void event_disconnected (const endpoint_uri_pair_t &endpoint_uri_pair_, |
146 | zmq::fd_t fd_); |
147 | void event_handshake_failed_no_detail ( |
148 | const endpoint_uri_pair_t &endpoint_uri_pair_, int err_); |
149 | void event_handshake_failed_protocol ( |
150 | const endpoint_uri_pair_t &endpoint_uri_pair_, int err_); |
151 | void |
152 | event_handshake_failed_auth (const endpoint_uri_pair_t &endpoint_uri_pair_, |
153 | int err_); |
154 | void |
155 | event_handshake_succeeded (const endpoint_uri_pair_t &endpoint_uri_pair_, |
156 | int err_); |
157 | |
158 | // Query the state of a specific peer. The default implementation |
159 | // always returns an ENOTSUP error. |
160 | virtual int get_peer_state (const void *routing_id_, |
161 | size_t routing_id_size_) const; |
162 | |
163 | // Request for pipes statistics - will generate a ZMQ_EVENT_PIPES_STATS |
164 | // after gathering the data asynchronously. Requires event monitoring to |
165 | // be enabled. |
166 | int query_pipes_stats (); |
167 | |
168 | protected: |
169 | socket_base_t (zmq::ctx_t *parent_, |
170 | uint32_t tid_, |
171 | int sid_, |
172 | bool thread_safe_ = false); |
173 | virtual ~socket_base_t (); |
174 | |
175 | // Concrete algorithms for the x- methods are to be defined by |
176 | // individual socket types. |
177 | virtual void xattach_pipe (zmq::pipe_t *pipe_, |
178 | bool subscribe_to_all_ = false, |
179 | bool locally_initiated_ = false) = 0; |
180 | |
181 | // The default implementation assumes there are no specific socket |
182 | // options for the particular socket type. If not so, override this |
183 | // method. |
184 | virtual int |
185 | xsetsockopt (int option_, const void *optval_, size_t optvallen_); |
186 | |
187 | // The default implementation assumes that send is not supported. |
188 | virtual bool xhas_out (); |
189 | virtual int xsend (zmq::msg_t *msg_); |
190 | |
191 | // The default implementation assumes that recv in not supported. |
192 | virtual bool xhas_in (); |
193 | virtual int xrecv (zmq::msg_t *msg_); |
194 | |
195 | // i_pipe_events will be forwarded to these functions. |
196 | virtual void xread_activated (pipe_t *pipe_); |
197 | virtual void xwrite_activated (pipe_t *pipe_); |
198 | virtual void xhiccuped (pipe_t *pipe_); |
199 | virtual void xpipe_terminated (pipe_t *pipe_) = 0; |
200 | |
201 | // the default implementation assumes that joub and leave are not supported. |
202 | virtual int xjoin (const char *group_); |
203 | virtual int xleave (const char *group_); |
204 | |
205 | // Delay actual destruction of the socket. |
206 | void process_destroy (); |
207 | |
208 | private: |
209 | // test if event should be sent and then dispatch it |
210 | void event (const endpoint_uri_pair_t &endpoint_uri_pair_, |
211 | uint64_t values_[], |
212 | uint64_t values_count_, |
213 | uint64_t type_); |
214 | |
215 | // Socket event data dispatch |
216 | void monitor_event (uint64_t event_, |
217 | uint64_t values_[], |
218 | uint64_t values_count_, |
219 | const endpoint_uri_pair_t &endpoint_uri_pair_) const; |
220 | |
221 | // Monitor socket cleanup |
222 | void stop_monitor (bool send_monitor_stopped_event_ = true); |
223 | |
224 | // Creates new endpoint ID and adds the endpoint to the map. |
225 | void add_endpoint (const endpoint_uri_pair_t &endpoint_pair_, |
226 | own_t *endpoint_, |
227 | pipe_t *pipe_); |
228 | |
229 | // Map of open endpoints. |
230 | typedef std::pair<own_t *, pipe_t *> endpoint_pipe_t; |
231 | typedef std::multimap<std::string, endpoint_pipe_t> endpoints_t; |
232 | endpoints_t _endpoints; |
233 | |
234 | // Map of open inproc endpoints. |
235 | class inprocs_t |
236 | { |
237 | public: |
238 | void emplace (const char *endpoint_uri_, pipe_t *pipe_); |
239 | int erase_pipes (const std::string &endpoint_uri_str_); |
240 | void erase_pipe (pipe_t *pipe_); |
241 | |
242 | private: |
243 | typedef std::multimap<std::string, pipe_t *> map_t; |
244 | map_t _inprocs; |
245 | }; |
246 | inprocs_t _inprocs; |
247 | |
248 | // To be called after processing commands or invoking any command |
249 | // handlers explicitly. If required, it will deallocate the socket. |
250 | void check_destroy (); |
251 | |
252 | // Moves the flags from the message to local variables, |
253 | // to be later retrieved by getsockopt. |
254 | void (msg_t *msg_); |
255 | |
256 | // Used to check whether the object is a socket. |
257 | uint32_t _tag; |
258 | |
259 | // If true, associated context was already terminated. |
260 | bool _ctx_terminated; |
261 | |
262 | // If true, object should have been already destroyed. However, |
263 | // destruction is delayed while we unwind the stack to the point |
264 | // where it doesn't intersect the object being destroyed. |
265 | bool _destroyed; |
266 | |
267 | // Parse URI string. |
268 | static int |
269 | parse_uri (const char *uri_, std::string &protocol_, std::string &path_); |
270 | |
271 | // Check whether transport protocol, as specified in connect or |
272 | // bind, is available and compatible with the socket type. |
273 | int check_protocol (const std::string &protocol_) const; |
274 | |
275 | // Register the pipe with this socket. |
276 | void attach_pipe (zmq::pipe_t *pipe_, |
277 | bool subscribe_to_all_ = false, |
278 | bool locally_initiated_ = false); |
279 | |
280 | // Processes commands sent to this socket (if any). If timeout is -1, |
281 | // returns only after at least one command was processed. |
282 | // If throttle argument is true, commands are processed at most once |
283 | // in a predefined time period. |
284 | int process_commands (int timeout_, bool throttle_); |
285 | |
286 | // Handlers for incoming commands. |
287 | void process_stop (); |
288 | void process_bind (zmq::pipe_t *pipe_); |
289 | void process_pipe_stats_publish (uint64_t outbound_queue_count_, |
290 | uint64_t inbound_queue_count_, |
291 | endpoint_uri_pair_t *endpoint_pair_); |
292 | void process_term (int linger_); |
293 | void process_term_endpoint (std::string *endpoint_); |
294 | |
295 | void update_pipe_options (int option_); |
296 | |
297 | std::string resolve_tcp_addr (std::string endpoint_uri_, |
298 | const char *tcp_address_); |
299 | |
300 | // Socket's mailbox object. |
301 | i_mailbox *_mailbox; |
302 | |
303 | // List of attached pipes. |
304 | typedef array_t<pipe_t, 3> pipes_t; |
305 | pipes_t _pipes; |
306 | |
307 | // Reaper's poller and handle of this socket within it. |
308 | poller_t *_poller; |
309 | poller_t::handle_t _handle; |
310 | |
311 | // Timestamp of when commands were processed the last time. |
312 | uint64_t _last_tsc; |
313 | |
314 | // Number of messages received since last command processing. |
315 | int _ticks; |
316 | |
317 | // True if the last message received had MORE flag set. |
318 | bool _rcvmore; |
319 | |
320 | // Improves efficiency of time measurement. |
321 | clock_t _clock; |
322 | |
323 | // Monitor socket; |
324 | void *_monitor_socket; |
325 | |
326 | // Bitmask of events being monitored |
327 | int64_t _monitor_events; |
328 | |
329 | // Last socket endpoint resolved URI |
330 | std::string _last_endpoint; |
331 | |
332 | // Indicate if the socket is thread safe |
333 | const bool _thread_safe; |
334 | |
335 | // Signaler to be used in the reaping stage |
336 | signaler_t *_reaper_signaler; |
337 | |
338 | // Mutex for synchronize access to the socket in thread safe mode |
339 | mutex_t _sync; |
340 | |
341 | // Mutex to synchronize access to the monitor Pair socket |
342 | mutex_t _monitor_sync; |
343 | |
344 | ZMQ_NON_COPYABLE_NOR_MOVABLE (socket_base_t) |
345 | }; |
346 | |
347 | class routing_socket_base_t : public socket_base_t |
348 | { |
349 | protected: |
350 | routing_socket_base_t (class ctx_t *parent_, uint32_t tid_, int sid_); |
351 | ~routing_socket_base_t (); |
352 | |
353 | // methods from socket_base_t |
354 | virtual int |
355 | xsetsockopt (int option_, const void *optval_, size_t optvallen_); |
356 | virtual void xwrite_activated (pipe_t *pipe_); |
357 | |
358 | // own methods |
359 | std::string (); |
360 | bool connect_routing_id_is_set () const; |
361 | |
362 | struct out_pipe_t |
363 | { |
364 | pipe_t *pipe; |
365 | bool active; |
366 | }; |
367 | |
368 | void add_out_pipe (blob_t routing_id_, pipe_t *pipe_); |
369 | bool has_out_pipe (const blob_t &routing_id_) const; |
370 | out_pipe_t *lookup_out_pipe (const blob_t &routing_id_); |
371 | const out_pipe_t *lookup_out_pipe (const blob_t &routing_id_) const; |
372 | void erase_out_pipe (pipe_t *pipe_); |
373 | out_pipe_t try_erase_out_pipe (const blob_t &routing_id_); |
374 | template <typename Func> bool any_of_out_pipes (Func func_) |
375 | { |
376 | bool res = false; |
377 | for (out_pipes_t::iterator it = _out_pipes.begin (); |
378 | it != _out_pipes.end () && !res; ++it) { |
379 | res |= func_ (*it->second.pipe); |
380 | } |
381 | |
382 | return res; |
383 | } |
384 | |
385 | private: |
386 | // Outbound pipes indexed by the peer IDs. |
387 | typedef std::map<blob_t, out_pipe_t> out_pipes_t; |
388 | out_pipes_t _out_pipes; |
389 | |
390 | // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types |
391 | std::string _connect_routing_id; |
392 | }; |
393 | } |
394 | |
395 | #endif |
396 | |