1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include <string.h>
32
33#include "radio.hpp"
34#include "macros.hpp"
35#include "pipe.hpp"
36#include "err.hpp"
37#include "msg.hpp"
38
39zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
40 socket_base_t (parent_, tid_, sid_, true),
41 _lossy (true)
42{
43 options.type = ZMQ_RADIO;
44}
45
46zmq::radio_t::~radio_t ()
47{
48}
49
50void zmq::radio_t::xattach_pipe (pipe_t *pipe_,
51 bool subscribe_to_all_,
52 bool locally_initiated_)
53{
54 LIBZMQ_UNUSED (subscribe_to_all_);
55 LIBZMQ_UNUSED (locally_initiated_);
56
57 zmq_assert (pipe_);
58
59 // Don't delay pipe termination as there is no one
60 // to receive the delimiter.
61 pipe_->set_nodelay ();
62
63 _dist.attach (pipe_);
64
65 if (subscribe_to_all_)
66 _udp_pipes.push_back (pipe_);
67 // The pipe is active when attached. Let's read the subscriptions from
68 // it, if any.
69 else
70 xread_activated (pipe_);
71}
72
73void zmq::radio_t::xread_activated (pipe_t *pipe_)
74{
75 // There are some subscriptions waiting. Let's process them.
76 msg_t msg;
77 while (pipe_->read (&msg)) {
78 // Apply the subscription to the trie
79 if (msg.is_join () || msg.is_leave ()) {
80 std::string group = std::string (msg.group ());
81
82 if (msg.is_join ())
83 _subscriptions.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (group),
84 pipe_);
85 else {
86 std::pair<subscriptions_t::iterator, subscriptions_t::iterator>
87 range = _subscriptions.equal_range (group);
88
89 for (subscriptions_t::iterator it = range.first;
90 it != range.second; ++it) {
91 if (it->second == pipe_) {
92 _subscriptions.erase (it);
93 break;
94 }
95 }
96 }
97 }
98 msg.close ();
99 }
100}
101
102void zmq::radio_t::xwrite_activated (pipe_t *pipe_)
103{
104 _dist.activated (pipe_);
105}
106int zmq::radio_t::xsetsockopt (int option_,
107 const void *optval_,
108 size_t optvallen_)
109{
110 if (optvallen_ != sizeof (int) || *static_cast<const int *> (optval_) < 0) {
111 errno = EINVAL;
112 return -1;
113 }
114 if (option_ == ZMQ_XPUB_NODROP)
115 _lossy = (*static_cast<const int *> (optval_) == 0);
116 else {
117 errno = EINVAL;
118 return -1;
119 }
120 return 0;
121}
122
123void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
124{
125 for (subscriptions_t::iterator it = _subscriptions.begin (),
126 end = _subscriptions.end ();
127 it != end;) {
128 if (it->second == pipe_) {
129#if __cplusplus >= 201103L
130 it = _subscriptions.erase (it);
131#else
132 _subscriptions.erase (it++);
133#endif
134 } else {
135 ++it;
136 }
137 }
138
139 {
140 const udp_pipes_t::iterator end = _udp_pipes.end ();
141 const udp_pipes_t::iterator it =
142 std::find (_udp_pipes.begin (), end, pipe_);
143 if (it != end)
144 _udp_pipes.erase (it);
145 }
146
147 _dist.pipe_terminated (pipe_);
148}
149
150int zmq::radio_t::xsend (msg_t *msg_)
151{
152 // Radio sockets do not allow multipart data (ZMQ_SNDMORE)
153 if (msg_->flags () & msg_t::more) {
154 errno = EINVAL;
155 return -1;
156 }
157
158 _dist.unmatch ();
159
160 std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
161 _subscriptions.equal_range (std::string (msg_->group ()));
162
163 for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
164 _dist.match (it->second);
165
166 for (udp_pipes_t::iterator it = _udp_pipes.begin (),
167 end = _udp_pipes.end ();
168 it != end; ++it)
169 _dist.match (*it);
170
171 int rc = -1;
172 if (_lossy || _dist.check_hwm ()) {
173 if (_dist.send_to_matching (msg_) == 0) {
174 rc = 0; // Yay, sent successfully
175 }
176 } else
177 errno = EAGAIN;
178
179 return rc;
180}
181
182bool zmq::radio_t::xhas_out ()
183{
184 return _dist.has_out ();
185}
186
187int zmq::radio_t::xrecv (msg_t *msg_)
188{
189 // Messages cannot be received from PUB socket.
190 LIBZMQ_UNUSED (msg_);
191 errno = ENOTSUP;
192 return -1;
193}
194
195bool zmq::radio_t::xhas_in ()
196{
197 return false;
198}
199
200zmq::radio_session_t::radio_session_t (io_thread_t *io_thread_,
201 bool connect_,
202 socket_base_t *socket_,
203 const options_t &options_,
204 address_t *addr_) :
205 session_base_t (io_thread_, connect_, socket_, options_, addr_),
206 _state (group)
207{
208}
209
210zmq::radio_session_t::~radio_session_t ()
211{
212}
213
214int zmq::radio_session_t::push_msg (msg_t *msg_)
215{
216 if (msg_->flags () & msg_t::command) {
217 char *command_data = static_cast<char *> (msg_->data ());
218 const size_t data_size = msg_->size ();
219
220 int group_length;
221 char *group;
222
223 msg_t join_leave_msg;
224 int rc;
225
226 // Set the msg type to either JOIN or LEAVE
227 if (data_size >= 5 && memcmp (command_data, "\4JOIN", 5) == 0) {
228 group_length = static_cast<int> (data_size) - 5;
229 group = command_data + 5;
230 rc = join_leave_msg.init_join ();
231 } else if (data_size >= 6 && memcmp (command_data, "\5LEAVE", 6) == 0) {
232 group_length = static_cast<int> (data_size) - 6;
233 group = command_data + 6;
234 rc = join_leave_msg.init_leave ();
235 }
236 // If it is not a JOIN or LEAVE just push the message
237 else
238 return session_base_t::push_msg (msg_);
239
240 errno_assert (rc == 0);
241
242 // Set the group
243 rc = join_leave_msg.set_group (group, group_length);
244 errno_assert (rc == 0);
245
246 // Close the current command
247 rc = msg_->close ();
248 errno_assert (rc == 0);
249
250 // Push the join or leave command
251 *msg_ = join_leave_msg;
252 return session_base_t::push_msg (msg_);
253 }
254 return session_base_t::push_msg (msg_);
255}
256
257int zmq::radio_session_t::pull_msg (msg_t *msg_)
258{
259 if (_state == group) {
260 int rc = session_base_t::pull_msg (&_pending_msg);
261 if (rc != 0)
262 return rc;
263
264 const char *group = _pending_msg.group ();
265 int length = static_cast<int> (strlen (group));
266
267 // First frame is the group
268 rc = msg_->init_size (length);
269 errno_assert (rc == 0);
270 msg_->set_flags (msg_t::more);
271 memcpy (msg_->data (), group, length);
272
273 // Next status is the body
274 _state = body;
275 return 0;
276 }
277 *msg_ = _pending_msg;
278 _state = group;
279 return 0;
280}
281
282void zmq::radio_session_t::reset ()
283{
284 session_base_t::reset ();
285 _state = group;
286}
287