1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33SETUP_TEARDOWN_TESTCONTEXT
34
35void test_basic ()
36{
37 // Create a publisher
38 void *pub = test_context_socket (ZMQ_XPUB);
39 int manual = 1;
40 TEST_ASSERT_SUCCESS_ERRNO (
41 zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4));
42 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
43
44 // Create a subscriber
45 void *sub = test_context_socket (ZMQ_XSUB);
46 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
47
48 // Subscribe for A
49 const char subscription[] = {1, 'A', 0};
50 send_string_expect_success (sub, subscription, 0);
51
52 // Receive subscriptions from subscriber
53 recv_string_expect_success (pub, subscription, 0);
54
55 // Subscribe socket for B instead
56 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "B", 1));
57
58 // Sending A message and B Message
59 send_string_expect_success (pub, "A", 0);
60 send_string_expect_success (pub, "B", 0);
61
62 recv_string_expect_success (sub, "B", ZMQ_DONTWAIT);
63
64 // Clean up.
65 test_context_socket_close (pub);
66 test_context_socket_close (sub);
67}
68
69void test_unsubscribe_manual ()
70{
71 // Create a publisher
72 void *pub = test_context_socket (ZMQ_XPUB);
73 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
74
75 // set pub socket options
76 int manual = 1;
77 TEST_ASSERT_SUCCESS_ERRNO (
78 zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, sizeof (manual)));
79
80 // Create a subscriber
81 void *sub = test_context_socket (ZMQ_XSUB);
82 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
83
84 // Subscribe for A
85 const uint8_t subscription1[] = {1, 'A'};
86 send_array_expect_success (sub, subscription1, 0);
87
88 // Subscribe for B
89 const uint8_t subscription2[] = {1, 'B'};
90 send_array_expect_success (sub, subscription2, 0);
91
92 char buffer[3];
93
94 // Receive subscription "A" from subscriber
95 recv_array_expect_success (pub, subscription1, 0);
96
97 // Subscribe socket for XA instead
98 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
99
100 // Receive subscription "B" from subscriber
101 recv_array_expect_success (pub, subscription2, 0);
102
103 // Subscribe socket for XB instead
104 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
105
106 // Unsubscribe from A
107 const uint8_t unsubscription1[2] = {0, 'A'};
108 send_array_expect_success (sub, unsubscription1, 0);
109
110 // Receive unsubscription "A" from subscriber
111 recv_array_expect_success (pub, unsubscription1, 0);
112
113 // Unsubscribe socket from XA instead
114 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
115
116 // Sending messages XA, XB
117 send_string_expect_success (pub, "XA", 0);
118 send_string_expect_success (pub, "XB", 0);
119
120 // Subscriber should receive XB only
121 recv_string_expect_success (sub, "XB", ZMQ_DONTWAIT);
122
123 // Close subscriber
124 test_context_socket_close (sub);
125
126 // Receive unsubscription "B"
127 const char unsubscription2[2] = {0, 'B'};
128 TEST_ASSERT_EQUAL_INT (
129 sizeof unsubscription2,
130 TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pub, buffer, sizeof buffer, 0)));
131 TEST_ASSERT_EQUAL_INT8_ARRAY (unsubscription2, buffer,
132 sizeof unsubscription2);
133
134 // Unsubscribe socket from XB instead
135 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XB", 2));
136
137 // Clean up.
138 test_context_socket_close (pub);
139}
140
141void test_xpub_proxy_unsubscribe_on_disconnect ()
142{
143 const uint8_t topic_buff[] = {"1"};
144 const uint8_t payload_buff[] = {"X"};
145
146 char my_endpoint_backend[MAX_SOCKET_STRING];
147 char my_endpoint_frontend[MAX_SOCKET_STRING];
148
149 int manual = 1;
150
151 // proxy frontend
152 void *xsub_proxy = test_context_socket (ZMQ_XSUB);
153 bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
154 sizeof my_endpoint_frontend);
155
156 // proxy backend
157 void *xpub_proxy = test_context_socket (ZMQ_XPUB);
158 TEST_ASSERT_SUCCESS_ERRNO (
159 zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4));
160 bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
161 sizeof my_endpoint_backend);
162
163 // publisher
164 void *pub = test_context_socket (ZMQ_PUB);
165 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
166
167 // first subscriber subscribes
168 void *sub1 = test_context_socket (ZMQ_SUB);
169 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
170 TEST_ASSERT_SUCCESS_ERRNO (
171 zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_buff, 1));
172
173 // wait
174 msleep (SETTLE_TIME);
175
176 // proxy reroutes and confirms subscriptions
177 const uint8_t subscription[2] = {1, *topic_buff};
178 recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
179 TEST_ASSERT_SUCCESS_ERRNO (
180 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
181 send_array_expect_success (xsub_proxy, subscription, 0);
182
183 // second subscriber subscribes
184 void *sub2 = test_context_socket (ZMQ_SUB);
185 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
186 TEST_ASSERT_SUCCESS_ERRNO (
187 zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic_buff, 1));
188
189 // wait
190 msleep (SETTLE_TIME);
191
192 // proxy reroutes
193 recv_array_expect_success (xpub_proxy, subscription, ZMQ_DONTWAIT);
194 TEST_ASSERT_SUCCESS_ERRNO (
195 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic_buff, 1));
196 send_array_expect_success (xsub_proxy, subscription, 0);
197
198 // wait
199 msleep (SETTLE_TIME);
200
201 // let publisher send a msg
202 send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
203 send_array_expect_success (pub, payload_buff, 0);
204
205 // wait
206 msleep (SETTLE_TIME);
207
208 // proxy reroutes data messages to subscribers
209 recv_array_expect_success (xsub_proxy, topic_buff, ZMQ_DONTWAIT);
210 recv_array_expect_success (xsub_proxy, payload_buff, ZMQ_DONTWAIT);
211 send_array_expect_success (xpub_proxy, topic_buff, ZMQ_SNDMORE);
212 send_array_expect_success (xpub_proxy, payload_buff, 0);
213
214 // wait
215 msleep (SETTLE_TIME);
216
217 // each subscriber should now get a message
218 recv_array_expect_success (sub2, topic_buff, ZMQ_DONTWAIT);
219 recv_array_expect_success (sub2, payload_buff, ZMQ_DONTWAIT);
220
221 recv_array_expect_success (sub1, topic_buff, ZMQ_DONTWAIT);
222 recv_array_expect_success (sub1, payload_buff, ZMQ_DONTWAIT);
223
224 // Disconnect both subscribers
225 test_context_socket_close (sub1);
226 test_context_socket_close (sub2);
227
228 // wait
229 msleep (SETTLE_TIME);
230
231 // unsubscribe messages are passed from proxy to publisher
232 const uint8_t unsubscription[] = {0, *topic_buff};
233 recv_array_expect_success (xpub_proxy, unsubscription, 0);
234 TEST_ASSERT_SUCCESS_ERRNO (
235 zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
236 send_array_expect_success (xsub_proxy, unsubscription, 0);
237
238 // should receive another unsubscribe msg
239 recv_array_expect_success (xpub_proxy, unsubscription, 0);
240 TEST_ASSERT_SUCCESS_ERRNO (
241 zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic_buff, 1));
242 send_array_expect_success (xsub_proxy, unsubscription, 0);
243
244 // wait
245 msleep (SETTLE_TIME);
246
247 // let publisher send a msg
248 send_array_expect_success (pub, topic_buff, ZMQ_SNDMORE);
249 send_array_expect_success (pub, payload_buff, 0);
250
251 // wait
252 msleep (SETTLE_TIME);
253
254 // nothing should come to the proxy
255 char buffer[1];
256 TEST_ASSERT_FAILURE_ERRNO (
257 EAGAIN, zmq_recv (xsub_proxy, buffer, sizeof buffer, ZMQ_DONTWAIT));
258
259 test_context_socket_close (pub);
260 test_context_socket_close (xpub_proxy);
261 test_context_socket_close (xsub_proxy);
262}
263
264void test_missing_subscriptions ()
265{
266 const char *topic1 = "1";
267 const char *topic2 = "2";
268 const char *payload = "X";
269
270 char my_endpoint_backend[MAX_SOCKET_STRING];
271 char my_endpoint_frontend[MAX_SOCKET_STRING];
272
273 int manual = 1;
274
275 // proxy frontend
276 void *xsub_proxy = test_context_socket (ZMQ_XSUB);
277 bind_loopback_ipv4 (xsub_proxy, my_endpoint_frontend,
278 sizeof my_endpoint_frontend);
279
280 // proxy backend
281 void *xpub_proxy = test_context_socket (ZMQ_XPUB);
282 TEST_ASSERT_SUCCESS_ERRNO (
283 zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4));
284 bind_loopback_ipv4 (xpub_proxy, my_endpoint_backend,
285 sizeof my_endpoint_backend);
286
287 // publisher
288 void *pub = test_context_socket (ZMQ_PUB);
289 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pub, my_endpoint_frontend));
290
291 // Here's the problem: because subscribers subscribe in quick succession,
292 // the proxy is unable to confirm the first subscription before receiving
293 // the second. This causes the first subscription to get lost.
294
295 // first subscriber
296 void *sub1 = test_context_socket (ZMQ_SUB);
297 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub1, my_endpoint_backend));
298 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1));
299
300 // second subscriber
301 void *sub2 = test_context_socket (ZMQ_SUB);
302 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub2, my_endpoint_backend));
303 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1));
304
305 // wait
306 msleep (SETTLE_TIME);
307
308 // proxy now reroutes and confirms subscriptions
309 const uint8_t subscription1[] = {1, static_cast<uint8_t> (topic1[0])};
310 recv_array_expect_success (xpub_proxy, subscription1, ZMQ_DONTWAIT);
311 TEST_ASSERT_SUCCESS_ERRNO (
312 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1));
313 send_array_expect_success (xsub_proxy, subscription1, 0);
314
315 const uint8_t subscription2[] = {1, static_cast<uint8_t> (topic2[0])};
316 recv_array_expect_success (xpub_proxy, subscription2, ZMQ_DONTWAIT);
317 TEST_ASSERT_SUCCESS_ERRNO (
318 zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1));
319 send_array_expect_success (xsub_proxy, subscription2, 0);
320
321 // wait
322 msleep (SETTLE_TIME);
323
324 // let publisher send 2 msgs, each with its own topic_buff
325 send_string_expect_success (pub, topic1, ZMQ_SNDMORE);
326 send_string_expect_success (pub, payload, 0);
327 send_string_expect_success (pub, topic2, ZMQ_SNDMORE);
328 send_string_expect_success (pub, payload, 0);
329
330 // wait
331 msleep (SETTLE_TIME);
332
333 // proxy reroutes data messages to subscribers
334 recv_string_expect_success (xsub_proxy, topic1, ZMQ_DONTWAIT);
335 recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
336 send_string_expect_success (xpub_proxy, topic1, ZMQ_SNDMORE);
337 send_string_expect_success (xpub_proxy, payload, 0);
338
339 recv_string_expect_success (xsub_proxy, topic2, ZMQ_DONTWAIT);
340 recv_string_expect_success (xsub_proxy, payload, ZMQ_DONTWAIT);
341 send_string_expect_success (xpub_proxy, topic2, ZMQ_SNDMORE);
342 send_string_expect_success (xpub_proxy, payload, 0);
343
344 // wait
345 msleep (SETTLE_TIME);
346
347 // each subscriber should now get a message
348 recv_string_expect_success (sub2, topic2, ZMQ_DONTWAIT);
349 recv_string_expect_success (sub2, payload, ZMQ_DONTWAIT);
350
351 recv_string_expect_success (sub1, topic1, ZMQ_DONTWAIT);
352 recv_string_expect_success (sub1, payload, ZMQ_DONTWAIT);
353
354 // Clean up
355 test_context_socket_close (sub1);
356 test_context_socket_close (sub2);
357 test_context_socket_close (pub);
358 test_context_socket_close (xpub_proxy);
359 test_context_socket_close (xsub_proxy);
360}
361
362void test_unsubscribe_cleanup ()
363{
364 char my_endpoint[MAX_SOCKET_STRING];
365
366 // Create a publisher
367 void *pub = test_context_socket (ZMQ_XPUB);
368 int manual = 1;
369 TEST_ASSERT_SUCCESS_ERRNO (
370 zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4));
371 bind_loopback_ipv4 (pub, my_endpoint, sizeof my_endpoint);
372
373 // Create a subscriber
374 void *sub = test_context_socket (ZMQ_XSUB);
375 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
376
377 // Subscribe for A
378 const uint8_t subscription1[2] = {1, 'A'};
379 send_array_expect_success (sub, subscription1, 0);
380
381
382 // Receive subscriptions from subscriber
383 recv_array_expect_success (pub, subscription1, 0);
384 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2));
385
386 // send 2 messages
387 send_string_expect_success (pub, "XA", 0);
388 send_string_expect_success (pub, "XB", 0);
389
390 // receive the single message
391 recv_string_expect_success (sub, "XA", 0);
392
393 // should be nothing left in the queue
394 char buffer[2];
395 TEST_ASSERT_FAILURE_ERRNO (
396 EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
397
398 // close the socket
399 test_context_socket_close (sub);
400
401 // closing the socket will result in an unsubscribe event
402 const uint8_t unsubscription[2] = {0, 'A'};
403 recv_array_expect_success (pub, unsubscription, 0);
404
405 // this doesn't really do anything
406 // there is no last_pipe set it will just fail silently
407 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2));
408
409 // reconnect
410 sub = test_context_socket (ZMQ_XSUB);
411 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, my_endpoint));
412
413 // send a subscription for B
414 const uint8_t subscription2[2] = {1, 'B'};
415 send_array_expect_success (sub, subscription2, 0);
416
417 // receive the subscription, overwrite it to XB
418 recv_array_expect_success (pub, subscription2, 0);
419 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2));
420
421 // send 2 messages
422 send_string_expect_success (pub, "XA", 0);
423 send_string_expect_success (pub, "XB", 0);
424
425 // receive the single message
426 recv_string_expect_success (sub, "XB", 0);
427
428 // should be nothing left in the queue
429 TEST_ASSERT_FAILURE_ERRNO (
430 EAGAIN, zmq_recv (sub, buffer, sizeof buffer, ZMQ_DONTWAIT));
431
432 // Clean up.
433 test_context_socket_close (pub);
434 test_context_socket_close (sub);
435}
436
437void test_user_message ()
438{
439 // Create a publisher
440 void *pub = test_context_socket (ZMQ_XPUB);
441 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
442
443 // Create a subscriber
444 void *sub = test_context_socket (ZMQ_XSUB);
445 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
446
447 // Send some data that is neither sub nor unsub
448 const char subscription[] = {2, 'A', 0};
449 send_string_expect_success (sub, subscription, 0);
450
451 // Receive subscriptions from subscriber
452 recv_string_expect_success (pub, subscription, 0);
453
454 // Clean up.
455 test_context_socket_close (pub);
456 test_context_socket_close (sub);
457}
458
459#ifdef ZMQ_ONLY_FIRST_SUBSCRIBE
460void test_user_message_multi ()
461{
462 const int only_first_subscribe = 1;
463
464 // Create a publisher
465 void *pub = test_context_socket (ZMQ_XPUB);
466 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
467 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_ONLY_FIRST_SUBSCRIBE,
468 &only_first_subscribe,
469 sizeof (only_first_subscribe)));
470
471 // Create a subscriber
472 void *sub = test_context_socket (ZMQ_XSUB);
473 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
474 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_ONLY_FIRST_SUBSCRIBE,
475 &only_first_subscribe,
476 sizeof (only_first_subscribe)));
477
478 // Send some data that is neither sub nor unsub
479 const uint8_t msg_common[] = {'A', 'B', 'C'};
480 // Message starts with 0 but should still treated as user
481 const uint8_t msg_0a[] = {0, 'B', 'C'};
482 const uint8_t msg_0b[] = {0, 'C', 'D'};
483 // Message starts with 1 but should still treated as user
484 const uint8_t msg_1a[] = {1, 'B', 'C'};
485 const uint8_t msg_1b[] = {1, 'C', 'D'};
486
487 // Test second message starting with 0
488 send_array_expect_success (sub, msg_common, ZMQ_SNDMORE);
489 send_array_expect_success (sub, msg_0a, 0);
490
491 // Receive messages from subscriber
492 recv_array_expect_success (pub, msg_common, 0);
493 recv_array_expect_success (pub, msg_0a, 0);
494
495 // Test second message starting with 1
496 send_array_expect_success (sub, msg_common, ZMQ_SNDMORE);
497 send_array_expect_success (sub, msg_1a, 0);
498
499 // Receive messages from subscriber
500 recv_array_expect_success (pub, msg_common, 0);
501 recv_array_expect_success (pub, msg_1a, 0);
502
503 // Test first message starting with 1
504 send_array_expect_success (sub, msg_1a, ZMQ_SNDMORE);
505 send_array_expect_success (sub, msg_1b, 0);
506 recv_array_expect_success (pub, msg_1a, 0);
507 recv_array_expect_success (pub, msg_1b, 0);
508
509 send_array_expect_success (sub, msg_0a, ZMQ_SNDMORE);
510 send_array_expect_success (sub, msg_0b, 0);
511 recv_array_expect_success (pub, msg_0a, 0);
512 recv_array_expect_success (pub, msg_0b, 0);
513
514 // Clean up.
515 test_context_socket_close (pub);
516 test_context_socket_close (sub);
517}
518#endif
519
520int main ()
521{
522 setup_test_environment ();
523
524 UNITY_BEGIN ();
525 RUN_TEST (test_basic);
526 RUN_TEST (test_unsubscribe_manual);
527 RUN_TEST (test_xpub_proxy_unsubscribe_on_disconnect);
528 RUN_TEST (test_missing_subscriptions);
529 RUN_TEST (test_unsubscribe_cleanup);
530 RUN_TEST (test_user_message);
531#ifdef ZMQ_ONLY_FIRST_SUBSCRIBE
532 RUN_TEST (test_user_message_multi);
533#endif
534
535 return UNITY_END ();
536}
537