1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include <string.h>
32
33#include "macros.hpp"
34#include "dish.hpp"
35#include "err.hpp"
36
37zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
38 socket_base_t (parent_, tid_, sid_, true),
39 _has_message (false)
40{
41 options.type = ZMQ_DISH;
42
43 // When socket is being closed down we don't want to wait till pending
44 // subscription commands are sent to the wire.
45 options.linger.store (0);
46
47 int rc = _message.init ();
48 errno_assert (rc == 0);
49}
50
51zmq::dish_t::~dish_t ()
52{
53 int rc = _message.close ();
54 errno_assert (rc == 0);
55}
56
57void zmq::dish_t::xattach_pipe (pipe_t *pipe_,
58 bool subscribe_to_all_,
59 bool locally_initiated_)
60{
61 LIBZMQ_UNUSED (subscribe_to_all_);
62 LIBZMQ_UNUSED (locally_initiated_);
63
64 zmq_assert (pipe_);
65 _fq.attach (pipe_);
66 _dist.attach (pipe_);
67
68 // Send all the cached subscriptions to the new upstream peer.
69 send_subscriptions (pipe_);
70}
71
72void zmq::dish_t::xread_activated (pipe_t *pipe_)
73{
74 _fq.activated (pipe_);
75}
76
77void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
78{
79 _dist.activated (pipe_);
80}
81
82void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
83{
84 _fq.pipe_terminated (pipe_);
85 _dist.pipe_terminated (pipe_);
86}
87
88void zmq::dish_t::xhiccuped (pipe_t *pipe_)
89{
90 // Send all the cached subscriptions to the hiccuped pipe.
91 send_subscriptions (pipe_);
92}
93
94int zmq::dish_t::xjoin (const char *group_)
95{
96 std::string group = std::string (group_);
97
98 if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
99 errno = EINVAL;
100 return -1;
101 }
102
103 // User cannot join same group twice
104 if (!_subscriptions.insert (group).second) {
105 errno = EINVAL;
106 return -1;
107 }
108
109 msg_t msg;
110 int rc = msg.init_join ();
111 errno_assert (rc == 0);
112
113 rc = msg.set_group (group_);
114 errno_assert (rc == 0);
115
116 int err = 0;
117 rc = _dist.send_to_all (&msg);
118 if (rc != 0)
119 err = errno;
120 int rc2 = msg.close ();
121 errno_assert (rc2 == 0);
122 if (rc != 0)
123 errno = err;
124 return rc;
125}
126
127int zmq::dish_t::xleave (const char *group_)
128{
129 std::string group = std::string (group_);
130
131 if (group.length () > ZMQ_GROUP_MAX_LENGTH) {
132 errno = EINVAL;
133 return -1;
134 }
135
136 if (0 == _subscriptions.erase (group)) {
137 errno = EINVAL;
138 return -1;
139 }
140
141 msg_t msg;
142 int rc = msg.init_leave ();
143 errno_assert (rc == 0);
144
145 rc = msg.set_group (group_);
146 errno_assert (rc == 0);
147
148 int err = 0;
149 rc = _dist.send_to_all (&msg);
150 if (rc != 0)
151 err = errno;
152 int rc2 = msg.close ();
153 errno_assert (rc2 == 0);
154 if (rc != 0)
155 errno = err;
156 return rc;
157}
158
159int zmq::dish_t::xsend (msg_t *msg_)
160{
161 LIBZMQ_UNUSED (msg_);
162 errno = ENOTSUP;
163 return -1;
164}
165
166bool zmq::dish_t::xhas_out ()
167{
168 // Subscription can be added/removed anytime.
169 return true;
170}
171
172int zmq::dish_t::xrecv (msg_t *msg_)
173{
174 // If there's already a message prepared by a previous call to zmq_poll,
175 // return it straight ahead.
176 if (_has_message) {
177 const int rc = msg_->move (_message);
178 errno_assert (rc == 0);
179 _has_message = false;
180 return 0;
181 }
182
183 return xxrecv (msg_);
184}
185
186int zmq::dish_t::xxrecv (msg_t *msg_)
187{
188 do {
189 // Get a message using fair queueing algorithm.
190 const int rc = _fq.recv (msg_);
191
192 // If there's no message available, return immediately.
193 // The same when error occurs.
194 if (rc != 0)
195 return -1;
196
197 // Skip non matching messages
198 } while (0 == _subscriptions.count (std::string (msg_->group ())));
199
200 // Found a matching message
201 return 0;
202}
203
204bool zmq::dish_t::xhas_in ()
205{
206 // If there's already a message prepared by a previous call to zmq_poll,
207 // return straight ahead.
208 if (_has_message)
209 return true;
210
211 const int rc = xxrecv (&_message);
212 if (rc != 0) {
213 errno_assert (errno == EAGAIN);
214 return false;
215 }
216
217 // Matching message found
218 _has_message = true;
219 return true;
220}
221
222void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
223{
224 for (subscriptions_t::iterator it = _subscriptions.begin (),
225 end = _subscriptions.end ();
226 it != end; ++it) {
227 msg_t msg;
228 int rc = msg.init_join ();
229 errno_assert (rc == 0);
230
231 rc = msg.set_group (it->c_str ());
232 errno_assert (rc == 0);
233
234 // Send it to the pipe.
235 pipe_->write (&msg);
236 msg.close ();
237 }
238
239 pipe_->flush ();
240}
241
242zmq::dish_session_t::dish_session_t (io_thread_t *io_thread_,
243 bool connect_,
244 socket_base_t *socket_,
245 const options_t &options_,
246 address_t *addr_) :
247 session_base_t (io_thread_, connect_, socket_, options_, addr_),
248 _state (group)
249{
250}
251
252zmq::dish_session_t::~dish_session_t ()
253{
254}
255
256int zmq::dish_session_t::push_msg (msg_t *msg_)
257{
258 if (_state == group) {
259 if ((msg_->flags () & msg_t::more) != msg_t::more) {
260 errno = EFAULT;
261 return -1;
262 }
263
264 if (msg_->size () > ZMQ_GROUP_MAX_LENGTH) {
265 errno = EFAULT;
266 return -1;
267 }
268
269 _group_msg = *msg_;
270 _state = body;
271
272 int rc = msg_->init ();
273 errno_assert (rc == 0);
274 return 0;
275 }
276 const char *group_setting = msg_->group ();
277 int rc;
278 if (group_setting[0] != 0)
279 goto has_group;
280
281 // Set the message group
282 rc = msg_->set_group (static_cast<char *> (_group_msg.data ()),
283 _group_msg.size ());
284 errno_assert (rc == 0);
285
286 // We set the group, so we don't need the group_msg anymore
287 rc = _group_msg.close ();
288 errno_assert (rc == 0);
289has_group:
290 // Thread safe socket doesn't support multipart messages
291 if ((msg_->flags () & msg_t::more) == msg_t::more) {
292 errno = EFAULT;
293 return -1;
294 }
295
296 // Push message to dish socket
297 rc = session_base_t::push_msg (msg_);
298
299 if (rc == 0)
300 _state = group;
301
302 return rc;
303}
304
305int zmq::dish_session_t::pull_msg (msg_t *msg_)
306{
307 int rc = session_base_t::pull_msg (msg_);
308
309 if (rc != 0)
310 return rc;
311
312 if (!msg_->is_join () && !msg_->is_leave ())
313 return rc;
314
315 int group_length = static_cast<int> (strlen (msg_->group ()));
316
317 msg_t command;
318 int offset;
319
320 if (msg_->is_join ()) {
321 rc = command.init_size (group_length + 5);
322 errno_assert (rc == 0);
323 offset = 5;
324 memcpy (command.data (), "\4JOIN", 5);
325 } else {
326 rc = command.init_size (group_length + 6);
327 errno_assert (rc == 0);
328 offset = 6;
329 memcpy (command.data (), "\5LEAVE", 6);
330 }
331
332 command.set_flags (msg_t::command);
333 char *command_data = static_cast<char *> (command.data ());
334
335 // Copy the group
336 memcpy (command_data + offset, msg_->group (), group_length);
337
338 // Close the join message
339 rc = msg_->close ();
340 errno_assert (rc == 0);
341
342 *msg_ = command;
343
344 return 0;
345}
346
347void zmq::dish_session_t::reset ()
348{
349 session_base_t::reset ();
350 _state = group;
351}
352