1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "precompiled.hpp"
31#include <string.h>
32
33#include "xpub.hpp"
34#include "pipe.hpp"
35#include "err.hpp"
36#include "msg.hpp"
37#include "macros.hpp"
38#include "generic_mtrie_impl.hpp"
39
40zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
41 socket_base_t (parent_, tid_, sid_),
42 _verbose_subs (false),
43 _verbose_unsubs (false),
44 _more_send (false),
45 _more_recv (false),
46 _process_subscribe (false),
47 _only_first_subscribe (false),
48 _lossy (true),
49 _manual (false),
50 _send_last_pipe (false),
51 _pending_pipes (),
52 _welcome_msg ()
53{
54 _last_pipe = NULL;
55 options.type = ZMQ_XPUB;
56 _welcome_msg.init ();
57}
58
59zmq::xpub_t::~xpub_t ()
60{
61 _welcome_msg.close ();
62}
63
64void zmq::xpub_t::xattach_pipe (pipe_t *pipe_,
65 bool subscribe_to_all_,
66 bool locally_initiated_)
67{
68 LIBZMQ_UNUSED (locally_initiated_);
69
70 zmq_assert (pipe_);
71 _dist.attach (pipe_);
72
73 // If subscribe_to_all_ is specified, the caller would like to subscribe
74 // to all data on this pipe, implicitly.
75 if (subscribe_to_all_)
76 _subscriptions.add (NULL, 0, pipe_);
77
78 // if welcome message exists, send a copy of it
79 if (_welcome_msg.size () > 0) {
80 msg_t copy;
81 copy.init ();
82 int rc = copy.copy (_welcome_msg);
83 errno_assert (rc == 0);
84 bool ok = pipe_->write (&copy);
85 zmq_assert (ok);
86 pipe_->flush ();
87 }
88
89 // The pipe is active when attached. Let's read the subscriptions from
90 // it, if any.
91 xread_activated (pipe_);
92}
93
94void zmq::xpub_t::xread_activated (pipe_t *pipe_)
95{
96 // There are some subscriptions waiting. Let's process them.
97 msg_t msg;
98 while (pipe_->read (&msg)) {
99 metadata_t *metadata = msg.metadata ();
100 unsigned char *msg_data = static_cast<unsigned char *> (msg.data ()),
101 *data = NULL;
102 size_t size = 0;
103 bool subscribe = false;
104 bool is_subscribe_or_cancel = false;
105
106 bool first_part = !_more_recv;
107 _more_recv = (msg.flags () & msg_t::more) != 0;
108
109 if (first_part || _process_subscribe) {
110 // Apply the subscription to the trie
111 if (msg.is_subscribe () || msg.is_cancel ()) {
112 data = static_cast<unsigned char *> (msg.command_body ());
113 size = msg.command_body_size ();
114 subscribe = msg.is_subscribe ();
115 is_subscribe_or_cancel = true;
116 } else if (msg.size () > 0 && (*msg_data == 0 || *msg_data == 1)) {
117 data = msg_data + 1;
118 size = msg.size () - 1;
119 subscribe = *msg_data == 1;
120 is_subscribe_or_cancel = true;
121 }
122 }
123
124 if (first_part)
125 _process_subscribe =
126 !_only_first_subscribe || is_subscribe_or_cancel;
127
128 if (!is_subscribe_or_cancel) {
129 // Process user message coming upstream from xsub socket
130 _pending_data.push_back (blob_t (msg_data, msg.size ()));
131 if (metadata)
132 metadata->add_ref ();
133 _pending_metadata.push_back (metadata);
134 _pending_flags.push_back (msg.flags ());
135 msg.close ();
136 continue;
137 }
138
139 if (_manual) {
140 // Store manual subscription to use on termination
141 if (!subscribe)
142 _manual_subscriptions.rm (data, size, pipe_);
143 else
144 _manual_subscriptions.add (data, size, pipe_);
145
146 _pending_pipes.push_back (pipe_);
147
148 // ZMTP 3.1 hack: we need to support sub/cancel commands, but
149 // we can't give them back to userspace as it would be an API
150 // breakage since the payload of the message is completely
151 // different. Manually craft an old-style message instead.
152 data = data - 1;
153 size = size + 1;
154 if (subscribe)
155 *data = 1;
156 else
157 *data = 0;
158
159 _pending_data.push_back (blob_t (data, size));
160 if (metadata)
161 metadata->add_ref ();
162 _pending_metadata.push_back (metadata);
163 _pending_flags.push_back (0);
164 } else {
165 bool notify;
166 if (!subscribe) {
167 mtrie_t::rm_result rm_result =
168 _subscriptions.rm (data, size, pipe_);
169 // TODO reconsider what to do if rm_result == mtrie_t::not_found
170 notify = rm_result != mtrie_t::values_remain || _verbose_unsubs;
171 } else {
172 bool first_added = _subscriptions.add (data, size, pipe_);
173 notify = first_added || _verbose_subs;
174 }
175
176 // If the request was a new subscription, or the subscription
177 // was removed, or verbose mode is enabled, store it so that
178 // it can be passed to the user on next recv call.
179 if (options.type == ZMQ_XPUB && notify) {
180 data = data - 1;
181 size = size + 1;
182 if (subscribe)
183 *data = 1;
184 else
185 *data = 0;
186
187 _pending_data.push_back (blob_t (data, size));
188 if (metadata)
189 metadata->add_ref ();
190 _pending_metadata.push_back (metadata);
191 _pending_flags.push_back (0);
192 }
193 }
194 msg.close ();
195 }
196}
197
198void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
199{
200 _dist.activated (pipe_);
201}
202
203int zmq::xpub_t::xsetsockopt (int option_,
204 const void *optval_,
205 size_t optvallen_)
206{
207 if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER
208 || option_ == ZMQ_XPUB_MANUAL_LAST_VALUE || option_ == ZMQ_XPUB_NODROP
209 || option_ == ZMQ_XPUB_MANUAL || option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) {
210 if (optvallen_ != sizeof (int)
211 || *static_cast<const int *> (optval_) < 0) {
212 errno = EINVAL;
213 return -1;
214 }
215 if (option_ == ZMQ_XPUB_VERBOSE) {
216 _verbose_subs = (*static_cast<const int *> (optval_) != 0);
217 _verbose_unsubs = false;
218 } else if (option_ == ZMQ_XPUB_VERBOSER) {
219 _verbose_subs = (*static_cast<const int *> (optval_) != 0);
220 _verbose_unsubs = _verbose_subs;
221 } else if (option_ == ZMQ_XPUB_MANUAL_LAST_VALUE) {
222 _manual = (*static_cast<const int *> (optval_) != 0);
223 _send_last_pipe = _manual;
224 } else if (option_ == ZMQ_XPUB_NODROP)
225 _lossy = (*static_cast<const int *> (optval_) == 0);
226 else if (option_ == ZMQ_XPUB_MANUAL)
227 _manual = (*static_cast<const int *> (optval_) != 0);
228 else if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE)
229 _only_first_subscribe = (*static_cast<const int *> (optval_) != 0);
230 } else if (option_ == ZMQ_SUBSCRIBE && _manual) {
231 if (_last_pipe != NULL)
232 _subscriptions.add ((unsigned char *) optval_, optvallen_,
233 _last_pipe);
234 } else if (option_ == ZMQ_UNSUBSCRIBE && _manual) {
235 if (_last_pipe != NULL)
236 _subscriptions.rm ((unsigned char *) optval_, optvallen_,
237 _last_pipe);
238 } else if (option_ == ZMQ_XPUB_WELCOME_MSG) {
239 _welcome_msg.close ();
240
241 if (optvallen_ > 0) {
242 int rc = _welcome_msg.init_size (optvallen_);
243 errno_assert (rc == 0);
244
245 unsigned char *data =
246 static_cast<unsigned char *> (_welcome_msg.data ());
247 memcpy (data, optval_, optvallen_);
248 } else
249 _welcome_msg.init ();
250 } else {
251 errno = EINVAL;
252 return -1;
253 }
254 return 0;
255}
256
257static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_)
258{
259 LIBZMQ_UNUSED (data_);
260 LIBZMQ_UNUSED (size_);
261 LIBZMQ_UNUSED (arg_);
262}
263
264void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
265{
266 if (_manual) {
267 // Remove the pipe from the trie and send corresponding manual
268 // unsubscriptions upstream.
269 _manual_subscriptions.rm (pipe_, send_unsubscription, this, false);
270 // Remove pipe without actually sending the message as it was taken
271 // care of by the manual call above. subscriptions is the real mtrie,
272 // so the pipe must be removed from there or it will be left over.
273 _subscriptions.rm (pipe_, stub, (void *) NULL, false);
274 } else {
275 // Remove the pipe from the trie. If there are topics that nobody
276 // is interested in anymore, send corresponding unsubscriptions
277 // upstream.
278 _subscriptions.rm (pipe_, send_unsubscription, this, !_verbose_unsubs);
279 }
280
281 _dist.pipe_terminated (pipe_);
282}
283
284void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_)
285{
286 self_->_dist.match (pipe_);
287}
288
289void zmq::xpub_t::mark_last_pipe_as_matching (pipe_t *pipe_, xpub_t *self_)
290{
291 if (self_->_last_pipe == pipe_)
292 self_->_dist.match (pipe_);
293}
294
295int zmq::xpub_t::xsend (msg_t *msg_)
296{
297 bool msg_more = (msg_->flags () & msg_t::more) != 0;
298
299 // For the first part of multi-part message, find the matching pipes.
300 if (!_more_send) {
301 if (unlikely (_manual && _last_pipe && _send_last_pipe)) {
302 _subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
303 msg_->size (), mark_last_pipe_as_matching,
304 this);
305 _last_pipe = NULL;
306 } else
307 _subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
308 msg_->size (), mark_as_matching, this);
309 // If inverted matching is used, reverse the selection now
310 if (options.invert_matching) {
311 _dist.reverse_match ();
312 }
313 }
314
315 int rc = -1; // Assume we fail
316 if (_lossy || _dist.check_hwm ()) {
317 if (_dist.send_to_matching (msg_) == 0) {
318 // If we are at the end of multi-part message we can mark
319 // all the pipes as non-matching.
320 if (!msg_more)
321 _dist.unmatch ();
322 _more_send = msg_more;
323 rc = 0; // Yay, sent successfully
324 }
325 } else
326 errno = EAGAIN;
327 return rc;
328}
329
330bool zmq::xpub_t::xhas_out ()
331{
332 return _dist.has_out ();
333}
334
335int zmq::xpub_t::xrecv (msg_t *msg_)
336{
337 // If there is at least one
338 if (_pending_data.empty ()) {
339 errno = EAGAIN;
340 return -1;
341 }
342
343 // User is reading a message, set last_pipe and remove it from the deque
344 if (_manual && !_pending_pipes.empty ()) {
345 _last_pipe = _pending_pipes.front ();
346 _pending_pipes.pop_front ();
347 }
348
349 int rc = msg_->close ();
350 errno_assert (rc == 0);
351 rc = msg_->init_size (_pending_data.front ().size ());
352 errno_assert (rc == 0);
353 memcpy (msg_->data (), _pending_data.front ().data (),
354 _pending_data.front ().size ());
355
356 // set metadata only if there is some
357 if (metadata_t *metadata = _pending_metadata.front ()) {
358 msg_->set_metadata (metadata);
359 // Remove ref corresponding to vector placement
360 metadata->drop_ref ();
361 }
362
363 msg_->set_flags (_pending_flags.front ());
364 _pending_data.pop_front ();
365 _pending_metadata.pop_front ();
366 _pending_flags.pop_front ();
367 return 0;
368}
369
370bool zmq::xpub_t::xhas_in ()
371{
372 return !_pending_data.empty ();
373}
374
375void zmq::xpub_t::send_unsubscription (zmq::mtrie_t::prefix_t data_,
376 size_t size_,
377 xpub_t *self_)
378{
379 if (self_->options.type != ZMQ_PUB) {
380 // Place the unsubscription to the queue of pending (un)subscriptions
381 // to be retrieved by the user later on.
382 blob_t unsub (size_ + 1);
383 *unsub.data () = 0;
384 if (size_ > 0)
385 memcpy (unsub.data () + 1, data_, size_);
386 self_->_pending_data.ZMQ_PUSH_OR_EMPLACE_BACK (ZMQ_MOVE (unsub));
387 self_->_pending_metadata.push_back (NULL);
388 self_->_pending_flags.push_back (0);
389
390 if (self_->_manual) {
391 self_->_last_pipe = NULL;
392 self_->_pending_pipes.push_back (NULL);
393 }
394 }
395}
396