1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33SETUP_TEARDOWN_TESTCONTEXT
34
35void test_basic ()
36{
37 // Create a publisher
38 void *pub = test_context_socket (ZMQ_XPUB);
39 int manual = 1;
40 TEST_ASSERT_SUCCESS_ERRNO (
41 zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
42 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
43
44 // Create a subscriber
45 void *sub = test_context_socket (ZMQ_XSUB);
46 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
47
48 // Subscribe for A
49 const char subscription[] = {1, 'A', 0};
50 send_string_expect_success (sub, subscription, 0);
51
52 // Receive subscriptions from subscriber
53 recv_string_expect_success (pub, subscription, 0);
54
55 // Subscribe socket for B instead
56 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "B", 1));
57
58 // Sending A message and B Message
59 send_string_expect_success (pub, "A", 0);
60 send_string_expect_success (pub, "B", 0);
61
62 recv_string_expect_success (sub, "B", ZMQ_DONTWAIT);
63
64 // Clean up.
65 test_context_socket_close (pub);
66 test_context_socket_close (sub);
67}
68
69void test_unsubscribe_manual ()
70{
71 // Create a publisher
72 void *pub = test_context_socket (ZMQ_XPUB);
73 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
74
75 // set pub socket options
76 int manual = 1;
77 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE,
78 &manual, sizeof (manual)));
79
80 // Create a subscriber
81 void *sub = test_context_socket (ZMQ_XSUB);
82 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
83
84 // Subscribe for A
85 const uint8_t subscription1[] = {1, 'A'};
86 send_array_expect_success (sub, subscription1, 0);
87
88 // Subscribe for B
89 const uint8_t subscription2[] = {1, 'B'};
90 send_array_expect_success (sub, subscription2, 0);
91
92 char buffer[3];
93
94 // Receive subscription "A" from subscriber
95 recv_array_expect_success (pub, subscription1, 0);
96
97 // Subscribe socket for XA instead
98 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
99
100 // Receive subscription "B" from subscriber
101 recv_array_expect_success (pub, subscription2, 0);
102
103 // Subscribe socket for XB instead
104 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
105
106 // Unsubscribe from A
107 const uint8_t unsubscription1[2] = {0, 'A'};
108 send_array_expect_success (sub, unsubscription1, 0);
109
110 // Receive unsubscription "A" from subscriber
111 recv_array_expect_success (pub, unsubscription1, 0);
112
113 // Unsubscribe socket from XA instead
114 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
115
116 // Sending messages XA, XB
117 send_string_expect_success (pub, "XA", 0);
118 send_string_expect_success (pub, "XB", 0);
119
120 // Subscriber should receive XB only
121 recv_string_expect_success (sub, "XB", ZMQ_DONTWAIT);
122
123 // Close subscriber
124 test_context_socket_close (sub);
125
126 // Receive unsubscription "B"
127 const char unsubscription2[2] = {0, 'B'};
128 TEST_ASSERT_EQUAL_INT (
129 sizeof unsubscription2,
130 TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pub, buffer, sizeof buffer, 0)));
131 TEST_ASSERT_EQUAL_INT8_ARRAY (unsubscription2, buffer,
132 sizeof unsubscription2);
133
134 // Unsubscribe socket from XB instead
135 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XB", 2));
136
137 // Clean up.
138 test_context_socket_close (pub);
139}
140
141void test_xpub_proxy_unsubscribe_on_disconnect ()
142{
143 const uint8_t topic_buff[] = {"1"};
144 const uint8_t payload_buff[] = {"X"};
145
146 char my_endpoint_backend[MAX_SOCKET_STRING];
147 char my_endpoint_frontend[MAX_SOCKET_STRING];
148
149 int manual = 1;
150
151 // proxy frontend
152 void *xsub_proxy = test_context_socket (ZMQ_XSUB);
153 bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
154 sizeof my_endpoint_frontend);
155
156 // proxy backend
157 void *xpub_proxy = test_context_socket (ZMQ_XPUB);
158 TEST_ASSERT_SUCCESS_ERRNO (
159 zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
160 bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
161 sizeof my_endpoint_backend);
162
163 // publisher
164 void *pub = test_context_socket (ZMQ_PUB);
165 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
166
167 // first subscriber subscribes
168 void *sub1 = test_context_socket (ZMQ_SUB);
169 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
170 TEST_ASSERT_SUCCESS_ERRNO (
171 zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_buff, 1));
172
173 // wait
174 msleep (SETTLE_TIME);
175
176 // proxy reroutes and confirms subscriptions
177 const uint8_t subscription[2] = {1, *topic_buff};
178 recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
179 TEST_ASSERT_SUCCESS_ERRNO (
180 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
181 send_array_expect_success (xsub_proxy, subscription, 0);
182
183 // second subscriber subscribes
184 void *sub2 = test_context_socket (ZMQ_SUB);
185 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
186 TEST_ASSERT_SUCCESS_ERRNO (
187 zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic_buff, 1));
188
189 // wait
190 msleep (SETTLE_TIME);
191
192 // proxy reroutes
193 recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
194 TEST_ASSERT_SUCCESS_ERRNO (
195 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
196 send_array_expect_success (xsub_proxy, subscription, 0);
197
198 // wait
199 msleep (SETTLE_TIME);
200
201 // let publisher send a msg
202 send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
203 send_array_expect_success (pub, payload_buff, 0);
204
205 // wait
206 msleep (SETTLE_TIME);
207
208 // proxy reroutes data messages to subscribers
209 recv_array_expect_success (xsub_proxy, topic_buff, ZMQ_DONTWAIT);
210 recv_array_expect_success (xsub_proxy, payload_buff, ZMQ_DONTWAIT);
211
212 // send 2 messages
213 send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
214 send_array_expect_success (xpub_proxy, payload_buff, 0);
215 send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
216 send_array_expect_success (xpub_proxy, payload_buff, 0);
217
218 // wait
219 msleep (SETTLE_TIME);
220
221 // sub2 will get 2 messages because the last subscription is sub2.
222 recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
223 recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
224 recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
225 recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
226
227 recv_array_expect_success (sub1, topic_buff, ZMQ_DONTWAIT);
228 recv_array_expect_success (sub1, payload_buff, ZMQ_DONTWAIT);
229
230 // Disconnect both subscribers
231 test_context_socket_close (sub1);
232 test_context_socket_close (sub2);
233
234 // wait
235 msleep (SETTLE_TIME);
236
237 // unsubscribe messages are passed from proxy to publisher
238 const uint8_t unsubscription[] = {0, *topic_buff};
239 recv_array_expect_success (xpub_proxy, unsubscription, 0);
240 TEST_ASSERT_SUCCESS_ERRNO (
241 zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
242 send_array_expect_success (xsub_proxy, unsubscription, 0);
243
244 // should receive another unsubscribe msg
245 recv_array_expect_success (xpub_proxy, unsubscription, 0);
246 TEST_ASSERT_SUCCESS_ERRNO (
247 zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
248 send_array_expect_success (xsub_proxy, unsubscription, 0);
249
250 // wait
251 msleep (SETTLE_TIME);
252
253 // let publisher send a msg
254 send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
255 send_array_expect_success (pub, payload_buff, 0);
256
257 // wait
258 msleep (SETTLE_TIME);
259
260 // nothing should come to the proxy
261 char buffer[1];
262 TEST_ASSERT_FAILURE_ERRNO (
263 EAGAIN, zmq_recv (xsub_proxy, buffer, sizeof buffer, ZMQ_DONTWAIT));
264
265 test_context_socket_close (pub);
266 test_context_socket_close (xpub_proxy);
267 test_context_socket_close (xsub_proxy);
268}
269
270void test_missing_subscriptions ()
271{
272 const char *topic1 = "1";
273 const char *topic2 = "2";
274 const char *payload = "X";
275
276 char my_endpoint_backend[MAX_SOCKET_STRING];
277 char my_endpoint_frontend[MAX_SOCKET_STRING];
278
279 int manual = 1;
280
281 // proxy frontend
282 void *xsub_proxy = test_context_socket (ZMQ_XSUB);
283 bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
284 sizeof my_endpoint_frontend);
285
286 // proxy backend
287 void *xpub_proxy = test_context_socket (ZMQ_XPUB);
288 TEST_ASSERT_SUCCESS_ERRNO (
289 zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
290 bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
291 sizeof my_endpoint_backend);
292
293 // publisher
294 void *pub = test_context_socket (ZMQ_PUB);
295 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
296
297 // Here's the problem: because subscribers subscribe in quick succession,
298 // the proxy is unable to confirm the first subscription before receiving
299 // the second. This causes the first subscription to get lost.
300
301 // first subscriber
302 void *sub1 = test_context_socket (ZMQ_SUB);
303 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
304 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1));
305
306 // second subscriber
307 void *sub2 = test_context_socket (ZMQ_SUB);
308 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
309 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1));
310
311 // wait
312 msleep (SETTLE_TIME);
313
314 // proxy now reroutes and confirms subscriptions
315 const uint8_t subscription1[] = {1, static_cast<uint8_t> (topic1[0])};
316 recv_array_expect_success (xpub_proxy, subscription1, ZMQ_DONTWAIT);
317 TEST_ASSERT_SUCCESS_ERRNO (
318 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1));
319 send_array_expect_success (xsub_proxy, subscription1, 0);
320
321 const uint8_t subscription2[] = {1, static_cast<uint8_t> (topic2[0])};
322 recv_array_expect_success (xpub_proxy, subscription2, ZMQ_DONTWAIT);
323 TEST_ASSERT_SUCCESS_ERRNO (
324 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1));
325 send_array_expect_success (xsub_proxy, subscription2, 0);
326
327 // wait
328 msleep (SETTLE_TIME);
329
330 // let publisher send 2 msgs, each with its own topic_buff
331 send_string_expect_success (pub, topic1, ZMQ_SNDMORE);
332 send_string_expect_success (pub, payload, 0);
333 send_string_expect_success (pub, topic2, ZMQ_SNDMORE);
334 send_string_expect_success (pub, payload, 0);
335
336 // wait
337 msleep (SETTLE_TIME);
338
339 // proxy reroutes data messages to subscribers
340 recv_string_expect_success (xsub_proxy, topic1, ZMQ_DONTWAIT);
341 recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
342 send_string_expect_success (xpub_proxy, topic1, ZMQ_SNDMORE);
343 send_string_expect_success (xpub_proxy, payload, 0);
344
345 recv_string_expect_success (xsub_proxy, topic2, ZMQ_DONTWAIT);
346 recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
347 send_string_expect_success (xpub_proxy, topic2, ZMQ_SNDMORE);
348 send_string_expect_success (xpub_proxy, payload, 0);
349
350 // wait
351 msleep (SETTLE_TIME);
352
353 // only sub2 should now get a message
354 recv_string_expect_success (sub2, topic2, ZMQ_DONTWAIT);
355 recv_string_expect_success (sub2, payload, ZMQ_DONTWAIT);
356
357 //recv_string_expect_success (sub1, topic1, ZMQ_DONTWAIT);
358 //recv_string_expect_success (sub1, payload, ZMQ_DONTWAIT);
359
360 // Clean up
361 test_context_socket_close (sub1);
362 test_context_socket_close (sub2);
363 test_context_socket_close (pub);
364 test_context_socket_close (xpub_proxy);
365 test_context_socket_close (xsub_proxy);
366}
367
368void test_unsubscribe_cleanup ()
369{
370 char my_endpoint[MAX_SOCKET_STRING];
371
372 // Create a publisher
373 void *pub = test_context_socket (ZMQ_XPUB);
374 int manual = 1;
375 TEST_ASSERT_SUCCESS_ERRNO (
376 zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
377 bind_loopback_ipv4 (pub, my_endpoint, sizeof my_endpoint);
378
379 // Create a subscriber
380 void *sub = test_context_socket (ZMQ_XSUB);
381 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
382
383 // Subscribe for A
384 const uint8_t subscription1[2] = {1, 'A'};
385 send_array_expect_success (sub, subscription1, 0);
386
387
388 // Receive subscriptions from subscriber
389 recv_array_expect_success (pub, subscription1, 0);
390 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
391
392 // send 2 messages
393 send_string_expect_success (pub, "XA", 0);
394 send_string_expect_success (pub, "XB", 0);
395
396 // receive the single message
397 recv_string_expect_success (sub, "XA", 0);
398
399 // should be nothing left in the queue
400 char buffer[2];
401 TEST_ASSERT_FAILURE_ERRNO (
402 EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
403
404 // close the socket
405 test_context_socket_close (sub);
406
407 // closing the socket will result in an unsubscribe event
408 const uint8_t unsubscription[2] = {0, 'A'};
409 recv_array_expect_success (pub, unsubscription, 0);
410
411 // this doesn't really do anything
412 // there is no last_pipe set it will just fail silently
413 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
414
415 // reconnect
416 sub = test_context_socket (ZMQ_XSUB);
417 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
418
419 // send a subscription for B
420 const uint8_t subscription2[2] = {1, 'B'};
421 send_array_expect_success (sub, subscription2, 0);
422
423 // receive the subscription, overwrite it to XB
424 recv_array_expect_success (pub, subscription2, 0);
425 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
426
427 // send 2 messages
428 send_string_expect_success (pub, "XA", 0);
429 send_string_expect_success (pub, "XB", 0);
430
431 // receive the single message
432 recv_string_expect_success (sub, "XB", 0);
433
434 // should be nothing left in the queue
435 TEST_ASSERT_FAILURE_ERRNO (
436 EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
437
438 // Clean up.
439 test_context_socket_close (pub);
440 test_context_socket_close (sub);
441}
442
443void test_manual_last_value ()
444{
445 // Create a publisher
446 void *pub = test_context_socket (ZMQ_XPUB);
447
448 int hwm = 2000;
449 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SNDHWM, &hwm, 4));
450
451 // set pub socket options
452 int manual = 1;
453 TEST_ASSERT_SUCCESS_ERRNO (
454 zmq_setsockopt (pub, ZMQ_XPUB_MANUAL_LAST_VALUE, &manual, 4));
455
456 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
457
458 // Create a subscriber
459 void *sub = test_context_socket (ZMQ_SUB);
460 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
461
462 // Create another subscriber
463 void *sub2 = test_context_socket (ZMQ_SUB);
464 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, "inproc://soname"));
465
466 // Subscribe for "A".
467 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1));
468
469 const uint8_t subscription[2] = {1, 'A'};
470 // we must wait for the subscription to be processed here, otherwise some
471 // or all published messages might be lost
472 recv_array_expect_success (pub, subscription, 0);
473
474 // manual subscribe message
475 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "A", 1));
476 send_string_expect_success (pub, "A", 0);
477 recv_string_expect_success (sub, "A", 0);
478
479 // Subscribe for "A".
480 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, "A", 1));
481 recv_array_expect_success (pub, subscription, 0);
482 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "A", 1));
483 send_string_expect_success (pub, "A", 0);
484 recv_string_expect_success (sub2, "A", 0);
485
486 char buffer[255];
487 // sub won't get a message because the last subscription pipe is sub2.
488 TEST_ASSERT_FAILURE_ERRNO (
489 EAGAIN, zmq_recv (sub, buffer, sizeof (buffer), ZMQ_DONTWAIT));
490
491 // Clean up.
492 test_context_socket_close (pub);
493 test_context_socket_close (sub);
494 test_context_socket_close (sub2);
495}
496
497int main ()
498{
499 setup_test_environment ();
500
501 UNITY_BEGIN ();
502 RUN_TEST (test_basic);
503 RUN_TEST (test_unsubscribe_manual);
504 RUN_TEST (test_xpub_proxy_unsubscribe_on_disconnect);
505 RUN_TEST (test_missing_subscriptions);
506 RUN_TEST (test_unsubscribe_cleanup);
507 RUN_TEST (test_manual_last_value);
508
509 return UNITY_END ();
510}
511