1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
31#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
32
33#include <string>
34#include <map>
35#include <stdarg.h>
36
37#include "own.hpp"
38#include "array.hpp"
39#include "blob.hpp"
40#include "stdint.hpp"
41#include "poller.hpp"
42#include "i_poll_events.hpp"
43#include "i_mailbox.hpp"
44#include "clock.hpp"
45#include "pipe.hpp"
46#include "endpoint.hpp"
47
48extern "C" {
49void zmq_free_event (void *data_, void *hint_);
50}
51
52namespace zmq
53{
54class ctx_t;
55class msg_t;
56class pipe_t;
57
58class socket_base_t : public own_t,
59 public array_item_t<>,
60 public i_poll_events,
61 public i_pipe_events
62{
63 friend class reaper_t;
64
65 public:
66 // Returns false if object is not a socket.
67 bool check_tag () const;
68
69 // Returns whether the socket is thread-safe.
70 bool is_thread_safe () const;
71
72 // Create a socket of a specified type.
73 static socket_base_t *
74 create (int type_, zmq::ctx_t *parent_, uint32_t tid_, int sid_);
75
76 // Returns the mailbox associated with this socket.
77 i_mailbox *get_mailbox () const;
78
79 // Interrupt blocking call if the socket is stuck in one.
80 // This function can be called from a different thread!
81 void stop ();
82
83 // Interface for communication with the API layer.
84 int setsockopt (int option_, const void *optval_, size_t optvallen_);
85 int getsockopt (int option_, void *optval_, size_t *optvallen_);
86 int bind (const char *endpoint_uri_);
87 int connect (const char *endpoint_uri_);
88 int term_endpoint (const char *endpoint_uri_);
89 int send (zmq::msg_t *msg_, int flags_);
90 int recv (zmq::msg_t *msg_, int flags_);
91 void add_signaler (signaler_t *s_);
92 void remove_signaler (signaler_t *s_);
93 int close ();
94
95 // These functions are used by the polling mechanism to determine
96 // which events are to be reported from this socket.
97 bool has_in ();
98 bool has_out ();
99
100 // Joining and leaving groups
101 int join (const char *group_);
102 int leave (const char *group_);
103
104 // Using this function reaper thread ask the socket to register with
105 // its poller.
106 void start_reaping (poller_t *poller_);
107
108 // i_poll_events implementation. This interface is used when socket
109 // is handled by the poller in the reaper thread.
110 void in_event ();
111 void out_event ();
112 void timer_event (int id_);
113
114 // i_pipe_events interface implementation.
115 void read_activated (pipe_t *pipe_);
116 void write_activated (pipe_t *pipe_);
117 void hiccuped (pipe_t *pipe_);
118 void pipe_terminated (pipe_t *pipe_);
119 void lock ();
120 void unlock ();
121
122 int monitor (const char *endpoint_,
123 uint64_t events_,
124 int event_version_,
125 int type_);
126
127 void event_connected (const endpoint_uri_pair_t &endpoint_uri_pair_,
128 zmq::fd_t fd_);
129 void event_connect_delayed (const endpoint_uri_pair_t &endpoint_uri_pair_,
130 int err_);
131 void event_connect_retried (const endpoint_uri_pair_t &endpoint_uri_pair_,
132 int interval_);
133 void event_listening (const endpoint_uri_pair_t &endpoint_uri_pair_,
134 zmq::fd_t fd_);
135 void event_bind_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
136 int err_);
137 void event_accepted (const endpoint_uri_pair_t &endpoint_uri_pair_,
138 zmq::fd_t fd_);
139 void event_accept_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
140 int err_);
141 void event_closed (const endpoint_uri_pair_t &endpoint_uri_pair_,
142 zmq::fd_t fd_);
143 void event_close_failed (const endpoint_uri_pair_t &endpoint_uri_pair_,
144 int err_);
145 void event_disconnected (const endpoint_uri_pair_t &endpoint_uri_pair_,
146 zmq::fd_t fd_);
147 void event_handshake_failed_no_detail (
148 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_);
149 void event_handshake_failed_protocol (
150 const endpoint_uri_pair_t &endpoint_uri_pair_, int err_);
151 void
152 event_handshake_failed_auth (const endpoint_uri_pair_t &endpoint_uri_pair_,
153 int err_);
154 void
155 event_handshake_succeeded (const endpoint_uri_pair_t &endpoint_uri_pair_,
156 int err_);
157
158 // Query the state of a specific peer. The default implementation
159 // always returns an ENOTSUP error.
160 virtual int get_peer_state (const void *routing_id_,
161 size_t routing_id_size_) const;
162
163 // Request for pipes statistics - will generate a ZMQ_EVENT_PIPES_STATS
164 // after gathering the data asynchronously. Requires event monitoring to
165 // be enabled.
166 int query_pipes_stats ();
167
168 protected:
169 socket_base_t (zmq::ctx_t *parent_,
170 uint32_t tid_,
171 int sid_,
172 bool thread_safe_ = false);
173 virtual ~socket_base_t ();
174
175 // Concrete algorithms for the x- methods are to be defined by
176 // individual socket types.
177 virtual void xattach_pipe (zmq::pipe_t *pipe_,
178 bool subscribe_to_all_ = false,
179 bool locally_initiated_ = false) = 0;
180
181 // The default implementation assumes there are no specific socket
182 // options for the particular socket type. If not so, override this
183 // method.
184 virtual int
185 xsetsockopt (int option_, const void *optval_, size_t optvallen_);
186
187 // The default implementation assumes that send is not supported.
188 virtual bool xhas_out ();
189 virtual int xsend (zmq::msg_t *msg_);
190
191 // The default implementation assumes that recv in not supported.
192 virtual bool xhas_in ();
193 virtual int xrecv (zmq::msg_t *msg_);
194
195 // i_pipe_events will be forwarded to these functions.
196 virtual void xread_activated (pipe_t *pipe_);
197 virtual void xwrite_activated (pipe_t *pipe_);
198 virtual void xhiccuped (pipe_t *pipe_);
199 virtual void xpipe_terminated (pipe_t *pipe_) = 0;
200
201 // the default implementation assumes that joub and leave are not supported.
202 virtual int xjoin (const char *group_);
203 virtual int xleave (const char *group_);
204
205 // Delay actual destruction of the socket.
206 void process_destroy ();
207
208 private:
209 // test if event should be sent and then dispatch it
210 void event (const endpoint_uri_pair_t &endpoint_uri_pair_,
211 uint64_t values_[],
212 uint64_t values_count_,
213 uint64_t type_);
214
215 // Socket event data dispatch
216 void monitor_event (uint64_t event_,
217 uint64_t values_[],
218 uint64_t values_count_,
219 const endpoint_uri_pair_t &endpoint_uri_pair_) const;
220
221 // Monitor socket cleanup
222 void stop_monitor (bool send_monitor_stopped_event_ = true);
223
224 // Creates new endpoint ID and adds the endpoint to the map.
225 void add_endpoint (const endpoint_uri_pair_t &endpoint_pair_,
226 own_t *endpoint_,
227 pipe_t *pipe_);
228
229 // Map of open endpoints.
230 typedef std::pair<own_t *, pipe_t *> endpoint_pipe_t;
231 typedef std::multimap<std::string, endpoint_pipe_t> endpoints_t;
232 endpoints_t _endpoints;
233
234 // Map of open inproc endpoints.
235 class inprocs_t
236 {
237 public:
238 void emplace (const char *endpoint_uri_, pipe_t *pipe_);
239 int erase_pipes (const std::string &endpoint_uri_str_);
240 void erase_pipe (pipe_t *pipe_);
241
242 private:
243 typedef std::multimap<std::string, pipe_t *> map_t;
244 map_t _inprocs;
245 };
246 inprocs_t _inprocs;
247
248 // To be called after processing commands or invoking any command
249 // handlers explicitly. If required, it will deallocate the socket.
250 void check_destroy ();
251
252 // Moves the flags from the message to local variables,
253 // to be later retrieved by getsockopt.
254 void extract_flags (msg_t *msg_);
255
256 // Used to check whether the object is a socket.
257 uint32_t _tag;
258
259 // If true, associated context was already terminated.
260 bool _ctx_terminated;
261
262 // If true, object should have been already destroyed. However,
263 // destruction is delayed while we unwind the stack to the point
264 // where it doesn't intersect the object being destroyed.
265 bool _destroyed;
266
267 // Parse URI string.
268 static int
269 parse_uri (const char *uri_, std::string &protocol_, std::string &path_);
270
271 // Check whether transport protocol, as specified in connect or
272 // bind, is available and compatible with the socket type.
273 int check_protocol (const std::string &protocol_) const;
274
275 // Register the pipe with this socket.
276 void attach_pipe (zmq::pipe_t *pipe_,
277 bool subscribe_to_all_ = false,
278 bool locally_initiated_ = false);
279
280 // Processes commands sent to this socket (if any). If timeout is -1,
281 // returns only after at least one command was processed.
282 // If throttle argument is true, commands are processed at most once
283 // in a predefined time period.
284 int process_commands (int timeout_, bool throttle_);
285
286 // Handlers for incoming commands.
287 void process_stop ();
288 void process_bind (zmq::pipe_t *pipe_);
289 void process_pipe_stats_publish (uint64_t outbound_queue_count_,
290 uint64_t inbound_queue_count_,
291 endpoint_uri_pair_t *endpoint_pair_);
292 void process_term (int linger_);
293 void process_term_endpoint (std::string *endpoint_);
294
295 void update_pipe_options (int option_);
296
297 std::string resolve_tcp_addr (std::string endpoint_uri_,
298 const char *tcp_address_);
299
300 // Socket's mailbox object.
301 i_mailbox *_mailbox;
302
303 // List of attached pipes.
304 typedef array_t<pipe_t, 3> pipes_t;
305 pipes_t _pipes;
306
307 // Reaper's poller and handle of this socket within it.
308 poller_t *_poller;
309 poller_t::handle_t _handle;
310
311 // Timestamp of when commands were processed the last time.
312 uint64_t _last_tsc;
313
314 // Number of messages received since last command processing.
315 int _ticks;
316
317 // True if the last message received had MORE flag set.
318 bool _rcvmore;
319
320 // Improves efficiency of time measurement.
321 clock_t _clock;
322
323 // Monitor socket;
324 void *_monitor_socket;
325
326 // Bitmask of events being monitored
327 int64_t _monitor_events;
328
329 // Last socket endpoint resolved URI
330 std::string _last_endpoint;
331
332 // Indicate if the socket is thread safe
333 const bool _thread_safe;
334
335 // Signaler to be used in the reaping stage
336 signaler_t *_reaper_signaler;
337
338 // Mutex for synchronize access to the socket in thread safe mode
339 mutex_t _sync;
340
341 // Mutex to synchronize access to the monitor Pair socket
342 mutex_t _monitor_sync;
343
344 ZMQ_NON_COPYABLE_NOR_MOVABLE (socket_base_t)
345};
346
347class routing_socket_base_t : public socket_base_t
348{
349 protected:
350 routing_socket_base_t (class ctx_t *parent_, uint32_t tid_, int sid_);
351 ~routing_socket_base_t ();
352
353 // methods from socket_base_t
354 virtual int
355 xsetsockopt (int option_, const void *optval_, size_t optvallen_);
356 virtual void xwrite_activated (pipe_t *pipe_);
357
358 // own methods
359 std::string extract_connect_routing_id ();
360 bool connect_routing_id_is_set () const;
361
362 struct out_pipe_t
363 {
364 pipe_t *pipe;
365 bool active;
366 };
367
368 void add_out_pipe (blob_t routing_id_, pipe_t *pipe_);
369 bool has_out_pipe (const blob_t &routing_id_) const;
370 out_pipe_t *lookup_out_pipe (const blob_t &routing_id_);
371 const out_pipe_t *lookup_out_pipe (const blob_t &routing_id_) const;
372 void erase_out_pipe (pipe_t *pipe_);
373 out_pipe_t try_erase_out_pipe (const blob_t &routing_id_);
374 template <typename Func> bool any_of_out_pipes (Func func_)
375 {
376 bool res = false;
377 for (out_pipes_t::iterator it = _out_pipes.begin ();
378 it != _out_pipes.end () && !res; ++it) {
379 res |= func_ (*it->second.pipe);
380 }
381
382 return res;
383 }
384
385 private:
386 // Outbound pipes indexed by the peer IDs.
387 typedef std::map<blob_t, out_pipe_t> out_pipes_t;
388 out_pipes_t _out_pipes;
389
390 // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
391 std::string _connect_routing_id;
392};
393}
394
395#endif
396