1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "precompiled.hpp" |
31 | #include <string.h> |
32 | |
33 | #include "macros.hpp" |
34 | #include "dish.hpp" |
35 | #include "err.hpp" |
36 | |
37 | zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) : |
38 | socket_base_t (parent_, tid_, sid_, true), |
39 | _has_message (false) |
40 | { |
41 | options.type = ZMQ_DISH; |
42 | |
43 | // When socket is being closed down we don't want to wait till pending |
44 | // subscription commands are sent to the wire. |
45 | options.linger.store (0); |
46 | |
47 | int rc = _message.init (); |
48 | errno_assert (rc == 0); |
49 | } |
50 | |
51 | zmq::dish_t::~dish_t () |
52 | { |
53 | int rc = _message.close (); |
54 | errno_assert (rc == 0); |
55 | } |
56 | |
57 | void zmq::dish_t::xattach_pipe (pipe_t *pipe_, |
58 | bool subscribe_to_all_, |
59 | bool locally_initiated_) |
60 | { |
61 | LIBZMQ_UNUSED (subscribe_to_all_); |
62 | LIBZMQ_UNUSED (locally_initiated_); |
63 | |
64 | zmq_assert (pipe_); |
65 | _fq.attach (pipe_); |
66 | _dist.attach (pipe_); |
67 | |
68 | // Send all the cached subscriptions to the new upstream peer. |
69 | send_subscriptions (pipe_); |
70 | } |
71 | |
72 | void zmq::dish_t::xread_activated (pipe_t *pipe_) |
73 | { |
74 | _fq.activated (pipe_); |
75 | } |
76 | |
77 | void zmq::dish_t::xwrite_activated (pipe_t *pipe_) |
78 | { |
79 | _dist.activated (pipe_); |
80 | } |
81 | |
82 | void zmq::dish_t::xpipe_terminated (pipe_t *pipe_) |
83 | { |
84 | _fq.pipe_terminated (pipe_); |
85 | _dist.pipe_terminated (pipe_); |
86 | } |
87 | |
88 | void zmq::dish_t::xhiccuped (pipe_t *pipe_) |
89 | { |
90 | // Send all the cached subscriptions to the hiccuped pipe. |
91 | send_subscriptions (pipe_); |
92 | } |
93 | |
94 | int zmq::dish_t::xjoin (const char *group_) |
95 | { |
96 | std::string group = std::string (group_); |
97 | |
98 | if (group.length () > ZMQ_GROUP_MAX_LENGTH) { |
99 | errno = EINVAL; |
100 | return -1; |
101 | } |
102 | |
103 | // User cannot join same group twice |
104 | if (!_subscriptions.insert (group).second) { |
105 | errno = EINVAL; |
106 | return -1; |
107 | } |
108 | |
109 | msg_t msg; |
110 | int rc = msg.init_join (); |
111 | errno_assert (rc == 0); |
112 | |
113 | rc = msg.set_group (group_); |
114 | errno_assert (rc == 0); |
115 | |
116 | int err = 0; |
117 | rc = _dist.send_to_all (&msg); |
118 | if (rc != 0) |
119 | err = errno; |
120 | int rc2 = msg.close (); |
121 | errno_assert (rc2 == 0); |
122 | if (rc != 0) |
123 | errno = err; |
124 | return rc; |
125 | } |
126 | |
127 | int zmq::dish_t::xleave (const char *group_) |
128 | { |
129 | std::string group = std::string (group_); |
130 | |
131 | if (group.length () > ZMQ_GROUP_MAX_LENGTH) { |
132 | errno = EINVAL; |
133 | return -1; |
134 | } |
135 | |
136 | if (0 == _subscriptions.erase (group)) { |
137 | errno = EINVAL; |
138 | return -1; |
139 | } |
140 | |
141 | msg_t msg; |
142 | int rc = msg.init_leave (); |
143 | errno_assert (rc == 0); |
144 | |
145 | rc = msg.set_group (group_); |
146 | errno_assert (rc == 0); |
147 | |
148 | int err = 0; |
149 | rc = _dist.send_to_all (&msg); |
150 | if (rc != 0) |
151 | err = errno; |
152 | int rc2 = msg.close (); |
153 | errno_assert (rc2 == 0); |
154 | if (rc != 0) |
155 | errno = err; |
156 | return rc; |
157 | } |
158 | |
159 | int zmq::dish_t::xsend (msg_t *msg_) |
160 | { |
161 | LIBZMQ_UNUSED (msg_); |
162 | errno = ENOTSUP; |
163 | return -1; |
164 | } |
165 | |
166 | bool zmq::dish_t::xhas_out () |
167 | { |
168 | // Subscription can be added/removed anytime. |
169 | return true; |
170 | } |
171 | |
172 | int zmq::dish_t::xrecv (msg_t *msg_) |
173 | { |
174 | // If there's already a message prepared by a previous call to zmq_poll, |
175 | // return it straight ahead. |
176 | if (_has_message) { |
177 | const int rc = msg_->move (_message); |
178 | errno_assert (rc == 0); |
179 | _has_message = false; |
180 | return 0; |
181 | } |
182 | |
183 | return xxrecv (msg_); |
184 | } |
185 | |
186 | int zmq::dish_t::xxrecv (msg_t *msg_) |
187 | { |
188 | do { |
189 | // Get a message using fair queueing algorithm. |
190 | const int rc = _fq.recv (msg_); |
191 | |
192 | // If there's no message available, return immediately. |
193 | // The same when error occurs. |
194 | if (rc != 0) |
195 | return -1; |
196 | |
197 | // Skip non matching messages |
198 | } while (0 == _subscriptions.count (std::string (msg_->group ()))); |
199 | |
200 | // Found a matching message |
201 | return 0; |
202 | } |
203 | |
204 | bool zmq::dish_t::xhas_in () |
205 | { |
206 | // If there's already a message prepared by a previous call to zmq_poll, |
207 | // return straight ahead. |
208 | if (_has_message) |
209 | return true; |
210 | |
211 | const int rc = xxrecv (&_message); |
212 | if (rc != 0) { |
213 | errno_assert (errno == EAGAIN); |
214 | return false; |
215 | } |
216 | |
217 | // Matching message found |
218 | _has_message = true; |
219 | return true; |
220 | } |
221 | |
222 | void zmq::dish_t::send_subscriptions (pipe_t *pipe_) |
223 | { |
224 | for (subscriptions_t::iterator it = _subscriptions.begin (), |
225 | end = _subscriptions.end (); |
226 | it != end; ++it) { |
227 | msg_t msg; |
228 | int rc = msg.init_join (); |
229 | errno_assert (rc == 0); |
230 | |
231 | rc = msg.set_group (it->c_str ()); |
232 | errno_assert (rc == 0); |
233 | |
234 | // Send it to the pipe. |
235 | pipe_->write (&msg); |
236 | msg.close (); |
237 | } |
238 | |
239 | pipe_->flush (); |
240 | } |
241 | |
242 | zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_, |
243 | bool connect_, |
244 | socket_base_t *socket_, |
245 | const options_t &options_, |
246 | address_t *addr_) : |
247 | session_base_t (io_thread_, connect_, socket_, options_, addr_), |
248 | _state (group) |
249 | { |
250 | } |
251 | |
252 | zmq::dish_session_t::~dish_session_t () |
253 | { |
254 | } |
255 | |
256 | int zmq::dish_session_t::push_msg (msg_t *msg_) |
257 | { |
258 | if (_state == group) { |
259 | if ((msg_->flags () & msg_t::more) != msg_t::more) { |
260 | errno = EFAULT; |
261 | return -1; |
262 | } |
263 | |
264 | if (msg_->size () > ZMQ_GROUP_MAX_LENGTH) { |
265 | errno = EFAULT; |
266 | return -1; |
267 | } |
268 | |
269 | _group_msg = *msg_; |
270 | _state = body; |
271 | |
272 | int rc = msg_->init (); |
273 | errno_assert (rc == 0); |
274 | return 0; |
275 | } |
276 | const char *group_setting = msg_->group (); |
277 | int rc; |
278 | if (group_setting[0] != 0) |
279 | goto has_group; |
280 | |
281 | // Set the message group |
282 | rc = msg_->set_group (static_cast<char *> (_group_msg.data ()), |
283 | _group_msg.size ()); |
284 | errno_assert (rc == 0); |
285 | |
286 | // We set the group, so we don't need the group_msg anymore |
287 | rc = _group_msg.close (); |
288 | errno_assert (rc == 0); |
289 | has_group: |
290 | // Thread safe socket doesn't support multipart messages |
291 | if ((msg_->flags () & msg_t::more) == msg_t::more) { |
292 | errno = EFAULT; |
293 | return -1; |
294 | } |
295 | |
296 | // Push message to dish socket |
297 | rc = session_base_t::push_msg (msg_); |
298 | |
299 | if (rc == 0) |
300 | _state = group; |
301 | |
302 | return rc; |
303 | } |
304 | |
305 | int zmq::dish_session_t::pull_msg (msg_t *msg_) |
306 | { |
307 | int rc = session_base_t::pull_msg (msg_); |
308 | |
309 | if (rc != 0) |
310 | return rc; |
311 | |
312 | if (!msg_->is_join () && !msg_->is_leave ()) |
313 | return rc; |
314 | |
315 | int group_length = static_cast<int> (strlen (msg_->group ())); |
316 | |
317 | msg_t command; |
318 | int offset; |
319 | |
320 | if (msg_->is_join ()) { |
321 | rc = command.init_size (group_length + 5); |
322 | errno_assert (rc == 0); |
323 | offset = 5; |
324 | memcpy (command.data (), "\4JOIN" , 5); |
325 | } else { |
326 | rc = command.init_size (group_length + 6); |
327 | errno_assert (rc == 0); |
328 | offset = 6; |
329 | memcpy (command.data (), "\5LEAVE" , 6); |
330 | } |
331 | |
332 | command.set_flags (msg_t::command); |
333 | char *command_data = static_cast<char *> (command.data ()); |
334 | |
335 | // Copy the group |
336 | memcpy (command_data + offset, msg_->group (), group_length); |
337 | |
338 | // Close the join message |
339 | rc = msg_->close (); |
340 | errno_assert (rc == 0); |
341 | |
342 | *msg_ = command; |
343 | |
344 | return 0; |
345 | } |
346 | |
347 | void zmq::dish_session_t::reset () |
348 | { |
349 | session_base_t::reset (); |
350 | _state = group; |
351 | } |
352 | |