1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include <string.h>
32
33#include "macros.hpp"
34#include "xsub.hpp"
35#include "err.hpp"
36
37zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
38 socket_base_t (parent_, tid_, sid_),
39 _has_message (false),
40 _more_send (false),
41 _more_recv (false),
42 _process_subscribe (false),
43 _only_first_subscribe (false)
44{
45 options.type = ZMQ_XSUB;
46
47 // When socket is being closed down we don't want to wait till pending
48 // subscription commands are sent to the wire.
49 options.linger.store (0);
50
51 int rc = _message.init ();
52 errno_assert (rc == 0);
53}
54
55zmq::xsub_t::~xsub_t ()
56{
57 int rc = _message.close ();
58 errno_assert (rc == 0);
59}
60
61void zmq::xsub_t::xattach_pipe (pipe_t *pipe_,
62 bool subscribe_to_all_,
63 bool locally_initiated_)
64{
65 LIBZMQ_UNUSED (subscribe_to_all_);
66 LIBZMQ_UNUSED (locally_initiated_);
67
68 zmq_assert (pipe_);
69 _fq.attach (pipe_);
70 _dist.attach (pipe_);
71
72 // Send all the cached subscriptions to the new upstream peer.
73 _subscriptions.apply (send_subscription, pipe_);
74 pipe_->flush ();
75}
76
77void zmq::xsub_t::xread_activated (pipe_t *pipe_)
78{
79 _fq.activated (pipe_);
80}
81
82void zmq::xsub_t::xwrite_activated (pipe_t *pipe_)
83{
84 _dist.activated (pipe_);
85}
86
87void zmq::xsub_t::xpipe_terminated (pipe_t *pipe_)
88{
89 _fq.pipe_terminated (pipe_);
90 _dist.pipe_terminated (pipe_);
91}
92
93void zmq::xsub_t::xhiccuped (pipe_t *pipe_)
94{
95 // Send all the cached subscriptions to the hiccuped pipe.
96 _subscriptions.apply (send_subscription, pipe_);
97 pipe_->flush ();
98}
99
100int zmq::xsub_t::xsetsockopt (int option_,
101 const void *optval_,
102 size_t optvallen_)
103{
104 if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) {
105 if (optvallen_ != sizeof (int)
106 || *static_cast<const int *> (optval_) < 0) {
107 errno = EINVAL;
108 return -1;
109 }
110 _only_first_subscribe = (*static_cast<const int *> (optval_) != 0);
111 return 0;
112 }
113 errno = EINVAL;
114 return -1;
115}
116
117int zmq::xsub_t::xsend (msg_t *msg_)
118{
119 size_t size = msg_->size ();
120 unsigned char *data = static_cast<unsigned char *> (msg_->data ());
121
122 bool first_part = !_more_send;
123 _more_send = (msg_->flags () & msg_t::more) != 0;
124
125 if (first_part) {
126 _process_subscribe = !_only_first_subscribe;
127 } else if (!_process_subscribe) {
128 // User message sent upstream to XPUB socket
129 return _dist.send_to_all (msg_);
130 }
131
132 if (msg_->is_subscribe () || (size > 0 && *data == 1)) {
133 // Process subscribe message
134 // This used to filter out duplicate subscriptions,
135 // however this is alread done on the XPUB side and
136 // doing it here as well breaks ZMQ_XPUB_VERBOSE
137 // when there are forwarding devices involved.
138 if (msg_->is_subscribe ()) {
139 data = static_cast<unsigned char *> (msg_->command_body ());
140 size = msg_->command_body_size ();
141 } else {
142 data = data + 1;
143 size = size - 1;
144 }
145 _subscriptions.add (data, size);
146 _process_subscribe = true;
147 return _dist.send_to_all (msg_);
148 }
149 if (msg_->is_cancel () || (size > 0 && *data == 0)) {
150 // Process unsubscribe message
151 if (msg_->is_cancel ()) {
152 data = static_cast<unsigned char *> (msg_->command_body ());
153 size = msg_->command_body_size ();
154 } else {
155 data = data + 1;
156 size = size - 1;
157 }
158 _process_subscribe = true;
159 if (_subscriptions.rm (data, size))
160 return _dist.send_to_all (msg_);
161 } else
162 // User message sent upstream to XPUB socket
163 return _dist.send_to_all (msg_);
164
165 int rc = msg_->close ();
166 errno_assert (rc == 0);
167 rc = msg_->init ();
168 errno_assert (rc == 0);
169
170 return 0;
171}
172
173bool zmq::xsub_t::xhas_out ()
174{
175 // Subscription can be added/removed anytime.
176 return true;
177}
178
179int zmq::xsub_t::xrecv (msg_t *msg_)
180{
181 // If there's already a message prepared by a previous call to zmq_poll,
182 // return it straight ahead.
183 if (_has_message) {
184 int rc = msg_->move (_message);
185 errno_assert (rc == 0);
186 _has_message = false;
187 _more_recv = (msg_->flags () & msg_t::more) != 0;
188 return 0;
189 }
190
191 // TODO: This can result in infinite loop in the case of continuous
192 // stream of non-matching messages which breaks the non-blocking recv
193 // semantics.
194 while (true) {
195 // Get a message using fair queueing algorithm.
196 int rc = _fq.recv (msg_);
197
198 // If there's no message available, return immediately.
199 // The same when error occurs.
200 if (rc != 0)
201 return -1;
202
203 // Check whether the message matches at least one subscription.
204 // Non-initial parts of the message are passed
205 if (_more_recv || !options.filter || match (msg_)) {
206 _more_recv = (msg_->flags () & msg_t::more) != 0;
207 return 0;
208 }
209
210 // Message doesn't match. Pop any remaining parts of the message
211 // from the pipe.
212 while (msg_->flags () & msg_t::more) {
213 rc = _fq.recv (msg_);
214 errno_assert (rc == 0);
215 }
216 }
217}
218
219bool zmq::xsub_t::xhas_in ()
220{
221 // There are subsequent parts of the partly-read message available.
222 if (_more_recv)
223 return true;
224
225 // If there's already a message prepared by a previous call to zmq_poll,
226 // return straight ahead.
227 if (_has_message)
228 return true;
229
230 // TODO: This can result in infinite loop in the case of continuous
231 // stream of non-matching messages.
232 while (true) {
233 // Get a message using fair queueing algorithm.
234 int rc = _fq.recv (&_message);
235
236 // If there's no message available, return immediately.
237 // The same when error occurs.
238 if (rc != 0) {
239 errno_assert (errno == EAGAIN);
240 return false;
241 }
242
243 // Check whether the message matches at least one subscription.
244 if (!options.filter || match (&_message)) {
245 _has_message = true;
246 return true;
247 }
248
249 // Message doesn't match. Pop any remaining parts of the message
250 // from the pipe.
251 while (_message.flags () & msg_t::more) {
252 rc = _fq.recv (&_message);
253 errno_assert (rc == 0);
254 }
255 }
256}
257
258bool zmq::xsub_t::match (msg_t *msg_)
259{
260 bool matching = _subscriptions.check (
261 static_cast<unsigned char *> (msg_->data ()), msg_->size ());
262
263 return matching ^ options.invert_matching;
264}
265
266void zmq::xsub_t::send_subscription (unsigned char *data_,
267 size_t size_,
268 void *arg_)
269{
270 pipe_t *pipe = static_cast<pipe_t *> (arg_);
271
272 // Create the subscription message.
273 msg_t msg;
274 int rc = msg.init_size (size_ + 1);
275 errno_assert (rc == 0);
276 unsigned char *data = static_cast<unsigned char *> (msg.data ());
277 data[0] = 1;
278
279 // We explicitly allow a NULL subscription with size zero
280 if (size_) {
281 assert (data_);
282 memcpy (data + 1, data_, size_);
283 }
284
285 // Send it to the pipe.
286 bool sent = pipe->write (&msg);
287 // If we reached the SNDHWM, and thus cannot send the subscription, drop
288 // the subscription message instead. This matches the behaviour of
289 // zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
290 // when the SNDHWM is reached.
291 if (!sent)
292 msg.close ();
293}
294