1/*
2 Copyright (c) 2018 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33SETUP_TEARDOWN_TESTCONTEXT
34
35const uint8_t unsubscribe_a_msg[] = {0, 'A'};
36const uint8_t subscribe_a_msg[] = {1, 'A'};
37const uint8_t subscribe_b_msg[] = {1, 'B'};
38
39const char test_endpoint[] = "inproc://soname";
40const char topic_a[] = "A";
41const char topic_b[] = "B";
42
43void test_xpub_verbose_one_sub ()
44{
45 void *pub = test_context_socket (ZMQ_XPUB);
46 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, test_endpoint));
47
48 void *sub = test_context_socket (ZMQ_SUB);
49 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, test_endpoint));
50
51 // Subscribe for A
52 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
53
54 // Receive subscriptions from subscriber
55 recv_array_expect_success (pub, subscribe_a_msg, 0);
56
57 // Subscribe socket for B instead
58 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_b, 1));
59
60 // Receive subscriptions from subscriber
61 recv_array_expect_success (pub, subscribe_b_msg, 0);
62
63 // Subscribe again for A again
64 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
65
66 // This time it is duplicated, so it will be filtered out
67 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
68
69 int verbose = 1;
70 TEST_ASSERT_SUCCESS_ERRNO (
71 zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int)));
72
73 // Subscribe socket for A again
74 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
75
76 // This time with VERBOSE the duplicated sub will be received
77 recv_array_expect_success (pub, subscribe_a_msg, 0);
78
79 // Sending A message and B Message
80 send_string_expect_success (pub, topic_a, 0);
81 send_string_expect_success (pub, topic_b, 0);
82
83 recv_string_expect_success (sub, topic_a, 0);
84 recv_string_expect_success (sub, topic_b, 0);
85
86 // Clean up.
87 test_context_socket_close (pub);
88 test_context_socket_close (sub);
89}
90
91void create_xpub_with_2_subs (void **pub_, void **sub0_, void **sub1_)
92{
93 *pub_ = test_context_socket (ZMQ_XPUB);
94 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (*pub_, test_endpoint));
95
96 *sub0_ = test_context_socket (ZMQ_SUB);
97 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (*sub0_, test_endpoint));
98
99 *sub1_ = test_context_socket (ZMQ_SUB);
100 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (*sub1_, test_endpoint));
101}
102
103void create_duplicate_subscription (void *pub_, void *sub0_, void *sub1_)
104{
105 // Subscribe for A
106 TEST_ASSERT_SUCCESS_ERRNO (
107 zmq_setsockopt (sub0_, ZMQ_SUBSCRIBE, topic_a, 1));
108
109 // Receive subscriptions from subscriber
110 recv_array_expect_success (pub_, subscribe_a_msg, 0);
111
112 // Subscribe again for A on the other socket
113 TEST_ASSERT_SUCCESS_ERRNO (
114 zmq_setsockopt (sub1_, ZMQ_SUBSCRIBE, topic_a, 1));
115
116 // This time it is duplicated, so it will be filtered out by XPUB
117 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub_, NULL, 0, ZMQ_DONTWAIT));
118}
119
120void test_xpub_verbose_two_subs ()
121{
122 void *pub, *sub0, *sub1;
123 create_xpub_with_2_subs (&pub, &sub0, &sub1);
124 create_duplicate_subscription (pub, sub0, sub1);
125
126 // Subscribe socket for B instead
127 TEST_ASSERT_SUCCESS_ERRNO (
128 zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, topic_b, 1));
129
130 // Receive subscriptions from subscriber
131 recv_array_expect_success (pub, subscribe_b_msg, 0);
132
133 int verbose = 1;
134 TEST_ASSERT_SUCCESS_ERRNO (
135 zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int)));
136
137 // Subscribe socket for A again
138 TEST_ASSERT_SUCCESS_ERRNO (
139 zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_a, 1));
140
141 // This time with VERBOSE the duplicated sub will be received
142 recv_array_expect_success (pub, subscribe_a_msg, 0);
143
144 // Sending A message and B Message
145 send_string_expect_success (pub, topic_a, 0);
146
147 send_string_expect_success (pub, topic_b, 0);
148
149 recv_string_expect_success (sub0, topic_a, 0);
150 recv_string_expect_success (sub1, topic_a, 0);
151 recv_string_expect_success (sub0, topic_b, 0);
152
153 // Clean up.
154 test_context_socket_close (pub);
155 test_context_socket_close (sub0);
156 test_context_socket_close (sub1);
157}
158
159void test_xpub_verboser_one_sub ()
160{
161 // Create a publisher
162 void *pub = test_context_socket (ZMQ_XPUB);
163 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, test_endpoint));
164
165 // Create a subscriber
166 void *sub = test_context_socket (ZMQ_SUB);
167 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, test_endpoint));
168
169 // Unsubscribe for A, does not exist yet
170 TEST_ASSERT_SUCCESS_ERRNO (
171 zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
172
173 // Does not exist, so it will be filtered out by XSUB
174 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
175
176 // Subscribe for A
177 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
178
179 // Receive subscriptions from subscriber
180 recv_array_expect_success (pub, subscribe_a_msg, 0);
181
182 // Subscribe again for A again, XSUB will increase refcount
183 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
184
185 // This time it is duplicated, so it will be filtered out by XPUB
186 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
187
188 // Unsubscribe for A, this time it exists in XPUB
189 TEST_ASSERT_SUCCESS_ERRNO (
190 zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
191
192 // XSUB refcounts and will not actually send unsub to PUB until the number
193 // of unsubs match the earlier subs
194 TEST_ASSERT_SUCCESS_ERRNO (
195 zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
196
197 // Receive unsubscriptions from subscriber
198 recv_array_expect_success (pub, unsubscribe_a_msg, 0);
199
200 // XSUB only sends the last and final unsub, so XPUB will only receive 1
201 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
202
203 // Unsubscribe for A, does not exist anymore
204 TEST_ASSERT_SUCCESS_ERRNO (
205 zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
206
207 // Does not exist, so it will be filtered out by XSUB
208 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
209
210 int verbose = 1;
211 TEST_ASSERT_SUCCESS_ERRNO (
212 zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int)));
213
214 // Subscribe socket for A again
215 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic_a, 1));
216
217 // Receive subscriptions from subscriber, did not exist anymore
218 recv_array_expect_success (pub, subscribe_a_msg, 0);
219
220 // Sending A message to make sure everything still works
221 send_string_expect_success (pub, topic_a, 0);
222
223 recv_string_expect_success (sub, topic_a, 0);
224
225 // Unsubscribe for A, this time it exists
226 TEST_ASSERT_SUCCESS_ERRNO (
227 zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
228
229 // Receive unsubscriptions from subscriber
230 recv_array_expect_success (pub, unsubscribe_a_msg, 0);
231
232 // Unsubscribe for A again, it does not exist anymore so XSUB will filter
233 TEST_ASSERT_SUCCESS_ERRNO (
234 zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic_a, 1));
235
236 // XSUB only sends unsub if it matched it in its trie, IOW: it will only
237 // send it if it existed in the first place even with XPUB_VERBBOSER
238 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
239
240 // Clean up.
241 test_context_socket_close (pub);
242 test_context_socket_close (sub);
243}
244
245void test_xpub_verboser_two_subs ()
246{
247 void *pub, *sub0, *sub1;
248 create_xpub_with_2_subs (&pub, &sub0, &sub1);
249 create_duplicate_subscription (pub, sub0, sub1);
250
251 // Unsubscribe for A, this time it exists in XPUB
252 TEST_ASSERT_SUCCESS_ERRNO (
253 zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, topic_a, 1));
254
255 // sub1 is still subscribed, so no notification
256 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
257
258 // Unsubscribe the second socket to trigger the notification
259 TEST_ASSERT_SUCCESS_ERRNO (
260 zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
261
262 // Receive unsubscriptions since all sockets are gone
263 recv_array_expect_success (pub, unsubscribe_a_msg, 0);
264
265 // Make really sure there is only one notification
266 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
267
268 int verbose = 1;
269 TEST_ASSERT_SUCCESS_ERRNO (
270 zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int)));
271
272 // Subscribe socket for A again
273 TEST_ASSERT_SUCCESS_ERRNO (
274 zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, topic_a, 1));
275
276 // Subscribe socket for A again
277 TEST_ASSERT_SUCCESS_ERRNO (
278 zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic_a, 1));
279
280 // Receive subscriptions from subscriber, did not exist anymore
281 recv_array_expect_success (pub, subscribe_a_msg, 0);
282
283 // VERBOSER is set, so subs from both sockets are received
284 recv_array_expect_success (pub, subscribe_a_msg, 0);
285
286 // Sending A message to make sure everything still works
287 send_string_expect_success (pub, topic_a, 0);
288
289 recv_string_expect_success (sub0, topic_a, 0);
290 recv_string_expect_success (sub1, topic_a, 0);
291
292 // Unsubscribe for A
293 TEST_ASSERT_SUCCESS_ERRNO (
294 zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
295
296 // Receive unsubscriptions from first subscriber due to VERBOSER
297 recv_array_expect_success (pub, unsubscribe_a_msg, 0);
298
299 // Unsubscribe for A again from the other socket
300 TEST_ASSERT_SUCCESS_ERRNO (
301 zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, topic_a, 1));
302
303 // Receive unsubscriptions from first subscriber due to VERBOSER
304 recv_array_expect_success (pub, unsubscribe_a_msg, 0);
305
306 // Unsubscribe again to make sure it gets filtered now
307 TEST_ASSERT_SUCCESS_ERRNO (
308 zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, topic_a, 1));
309
310 // Unmatched, so XSUB filters even with VERBOSER
311 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (pub, NULL, 0, ZMQ_DONTWAIT));
312
313 // Clean up.
314 test_context_socket_close (pub);
315 test_context_socket_close (sub0);
316 test_context_socket_close (sub1);
317}
318
319int main ()
320{
321 setup_test_environment ();
322
323 UNITY_BEGIN ();
324 RUN_TEST (test_xpub_verbose_one_sub);
325 RUN_TEST (test_xpub_verbose_two_subs);
326 RUN_TEST (test_xpub_verboser_one_sub);
327 RUN_TEST (test_xpub_verboser_two_subs);
328
329 return UNITY_END ();
330}
331