1 | /* |
2 | Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #ifndef __ZMQ_CTX_HPP_INCLUDED__ |
31 | #define __ZMQ_CTX_HPP_INCLUDED__ |
32 | |
33 | #include <map> |
34 | #include <vector> |
35 | #include <string> |
36 | #include <stdarg.h> |
37 | |
38 | #include "mailbox.hpp" |
39 | #include "array.hpp" |
40 | #include "config.hpp" |
41 | #include "mutex.hpp" |
42 | #include "stdint.hpp" |
43 | #include "options.hpp" |
44 | #include "atomic_counter.hpp" |
45 | #include "thread.hpp" |
46 | |
47 | namespace zmq |
48 | { |
49 | class object_t; |
50 | class io_thread_t; |
51 | class socket_base_t; |
52 | class reaper_t; |
53 | class pipe_t; |
54 | |
55 | // Information associated with inproc endpoint. Note that endpoint options |
56 | // are registered as well so that the peer can access them without a need |
57 | // for synchronisation, handshaking or similar. |
58 | struct endpoint_t |
59 | { |
60 | socket_base_t *socket; |
61 | options_t options; |
62 | }; |
63 | |
64 | class thread_ctx_t |
65 | { |
66 | public: |
67 | thread_ctx_t (); |
68 | |
69 | // Start a new thread with proper scheduling parameters. |
70 | void start_thread (thread_t &thread_, |
71 | thread_fn *tfn_, |
72 | void *arg_, |
73 | const char *name_ = NULL) const; |
74 | |
75 | int set (int option_, const void *optval_, size_t optvallen_); |
76 | int get (int option_, void *optval_, const size_t *optvallen_); |
77 | |
78 | protected: |
79 | // Synchronisation of access to context options. |
80 | mutex_t _opt_sync; |
81 | |
82 | private: |
83 | // Thread parameters. |
84 | int _thread_priority; |
85 | int _thread_sched_policy; |
86 | std::set<int> _thread_affinity_cpus; |
87 | std::string _thread_name_prefix; |
88 | }; |
89 | |
90 | // Context object encapsulates all the global state associated with |
91 | // the library. |
92 | |
93 | class ctx_t : public thread_ctx_t |
94 | { |
95 | public: |
96 | // Create the context object. |
97 | ctx_t (); |
98 | |
99 | // Returns false if object is not a context. |
100 | bool check_tag (); |
101 | |
102 | // This function is called when user invokes zmq_ctx_term. If there are |
103 | // no more sockets open it'll cause all the infrastructure to be shut |
104 | // down. If there are open sockets still, the deallocation happens |
105 | // after the last one is closed. |
106 | int terminate (); |
107 | |
108 | // This function starts the terminate process by unblocking any blocking |
109 | // operations currently in progress and stopping any more socket activity |
110 | // (except zmq_close). |
111 | // This function is non-blocking. |
112 | // terminate must still be called afterwards. |
113 | // This function is optional, terminate will unblock any current |
114 | // operations as well. |
115 | int shutdown (); |
116 | |
117 | // Set and get context properties. |
118 | int set (int option_, const void *optval_, size_t optvallen_); |
119 | int get (int option_, void *optval_, size_t *optvallen_); |
120 | int get (int option_); |
121 | |
122 | // Create and destroy a socket. |
123 | zmq::socket_base_t *create_socket (int type_); |
124 | void destroy_socket (zmq::socket_base_t *socket_); |
125 | |
126 | // Send command to the destination thread. |
127 | void send_command (uint32_t tid_, const command_t &command_); |
128 | |
129 | // Returns the I/O thread that is the least busy at the moment. |
130 | // Affinity specifies which I/O threads are eligible (0 = all). |
131 | // Returns NULL if no I/O thread is available. |
132 | zmq::io_thread_t *choose_io_thread (uint64_t affinity_); |
133 | |
134 | // Returns reaper thread object. |
135 | zmq::object_t *get_reaper (); |
136 | |
137 | // Management of inproc endpoints. |
138 | int register_endpoint (const char *addr_, const endpoint_t &endpoint_); |
139 | int unregister_endpoint (const std::string &addr_, socket_base_t *socket_); |
140 | void unregister_endpoints (zmq::socket_base_t *socket_); |
141 | endpoint_t find_endpoint (const char *addr_); |
142 | void pend_connection (const std::string &addr_, |
143 | const endpoint_t &endpoint_, |
144 | pipe_t **pipes_); |
145 | void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_); |
146 | |
147 | #ifdef ZMQ_HAVE_VMCI |
148 | // Return family for the VMCI socket or -1 if it's not available. |
149 | int get_vmci_socket_family (); |
150 | #endif |
151 | |
152 | enum |
153 | { |
154 | term_tid = 0, |
155 | reaper_tid = 1 |
156 | }; |
157 | |
158 | ~ctx_t (); |
159 | |
160 | bool valid () const; |
161 | |
162 | private: |
163 | bool start (); |
164 | |
165 | struct pending_connection_t |
166 | { |
167 | endpoint_t endpoint; |
168 | pipe_t *connect_pipe; |
169 | pipe_t *bind_pipe; |
170 | }; |
171 | |
172 | // Used to check whether the object is a context. |
173 | uint32_t _tag; |
174 | |
175 | // Sockets belonging to this context. We need the list so that |
176 | // we can notify the sockets when zmq_ctx_term() is called. |
177 | // The sockets will return ETERM then. |
178 | typedef array_t<socket_base_t> sockets_t; |
179 | sockets_t _sockets; |
180 | |
181 | // List of unused thread slots. |
182 | typedef std::vector<uint32_t> empty_slots_t; |
183 | empty_slots_t _empty_slots; |
184 | |
185 | // If true, zmq_init has been called but no socket has been created |
186 | // yet. Launching of I/O threads is delayed. |
187 | bool _starting; |
188 | |
189 | // If true, zmq_ctx_term was already called. |
190 | bool _terminating; |
191 | |
192 | // Synchronisation of accesses to global slot-related data: |
193 | // sockets, empty_slots, terminating. It also synchronises |
194 | // access to zombie sockets as such (as opposed to slots) and provides |
195 | // a memory barrier to ensure that all CPU cores see the same data. |
196 | mutex_t _slot_sync; |
197 | |
198 | // The reaper thread. |
199 | zmq::reaper_t *_reaper; |
200 | |
201 | // I/O threads. |
202 | typedef std::vector<zmq::io_thread_t *> io_threads_t; |
203 | io_threads_t _io_threads; |
204 | |
205 | // Array of pointers to mailboxes for both application and I/O threads. |
206 | std::vector<i_mailbox *> _slots; |
207 | |
208 | // Mailbox for zmq_ctx_term thread. |
209 | mailbox_t _term_mailbox; |
210 | |
211 | // List of inproc endpoints within this context. |
212 | typedef std::map<std::string, endpoint_t> endpoints_t; |
213 | endpoints_t _endpoints; |
214 | |
215 | // List of inproc connection endpoints pending a bind |
216 | typedef std::multimap<std::string, pending_connection_t> |
217 | pending_connections_t; |
218 | pending_connections_t _pending_connections; |
219 | |
220 | // Synchronisation of access to the list of inproc endpoints. |
221 | mutex_t _endpoints_sync; |
222 | |
223 | // Maximum socket ID. |
224 | static atomic_counter_t max_socket_id; |
225 | |
226 | // Maximum number of sockets that can be opened at the same time. |
227 | int _max_sockets; |
228 | |
229 | // Maximum allowed message size |
230 | int _max_msgsz; |
231 | |
232 | // Number of I/O threads to launch. |
233 | int _io_thread_count; |
234 | |
235 | // Does context wait (possibly forever) on termination? |
236 | bool _blocky; |
237 | |
238 | // Is IPv6 enabled on this context? |
239 | bool _ipv6; |
240 | |
241 | // Should we use zero copy message decoding in this context? |
242 | bool _zero_copy; |
243 | |
244 | ZMQ_NON_COPYABLE_NOR_MOVABLE (ctx_t) |
245 | |
246 | #ifdef HAVE_FORK |
247 | // the process that created this context. Used to detect forking. |
248 | pid_t _pid; |
249 | #endif |
250 | enum side |
251 | { |
252 | connect_side, |
253 | bind_side |
254 | }; |
255 | void |
256 | connect_inproc_sockets (zmq::socket_base_t *bind_socket_, |
257 | options_t &bind_options_, |
258 | const pending_connection_t &pending_connection_, |
259 | side side_); |
260 | |
261 | #ifdef ZMQ_HAVE_VMCI |
262 | int _vmci_fd; |
263 | int _vmci_family; |
264 | mutex_t _vmci_sync; |
265 | #endif |
266 | }; |
267 | } |
268 | |
269 | #endif |
270 | |