1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33#include <string.h>
34
35#ifndef _WIN32
36#include <sys/socket.h>
37#include <netinet/in.h>
38#include <arpa/inet.h>
39#include <unistd.h>
40#endif
41
42// Helper macro to define the v4/v6 function pairs
43#define MAKE_TEST_V4V6(_test) \
44 static void _test##_ipv4 () { _test (false); } \
45 \
46 static void _test##_ipv6 () \
47 { \
48 if (!is_ipv6_available ()) { \
49 TEST_IGNORE_MESSAGE ("ipv6 is not available"); \
50 } \
51 _test (true); \
52 }
53
54SETUP_TEARDOWN_TESTCONTEXT
55
56void msg_send_expect_success (void *s_, const char *group_, const char *body_)
57{
58 zmq_msg_t msg;
59 const size_t len = strlen (body_);
60 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, len));
61
62 memcpy (zmq_msg_data (&msg), body_, len);
63
64 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_set_group (&msg, group_));
65
66 int rc = zmq_msg_send (&msg, s_, 0);
67 TEST_ASSERT_EQUAL_INT ((int) len, rc);
68
69 // TODO isn't the msg closed by zmq_msg_send?
70 zmq_msg_close (&msg);
71}
72
73void msg_recv_cmp (void *s_, const char *group_, const char *body_)
74{
75 zmq_msg_t msg;
76 const size_t len = strlen (body_);
77 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
78
79 int recv_rc = TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, s_, 0));
80 TEST_ASSERT_EQUAL_INT (len, recv_rc);
81
82 TEST_ASSERT_EQUAL_STRING (group_, zmq_msg_group (&msg));
83
84 TEST_ASSERT_EQUAL_STRING_LEN (body_, zmq_msg_data (&msg), len);
85
86 zmq_msg_close (&msg);
87}
88
89void test_leave_unjoined_fails ()
90{
91 void *dish = test_context_socket (ZMQ_DISH);
92
93 // Leaving a group which we didn't join
94 TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_leave (dish, "Movies"));
95
96 test_context_socket_close (dish);
97}
98
99void test_join_too_long_fails ()
100{
101 void *dish = test_context_socket (ZMQ_DISH);
102
103 // Joining too long group
104 char too_long_group[ZMQ_GROUP_MAX_LENGTH + 2];
105 for (int index = 0; index < ZMQ_GROUP_MAX_LENGTH + 2; index++)
106 too_long_group[index] = 'A';
107 too_long_group[ZMQ_GROUP_MAX_LENGTH + 1] = '\0';
108 TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, too_long_group));
109
110 test_context_socket_close (dish);
111}
112
113void test_join_twice_fails ()
114{
115 void *dish = test_context_socket (ZMQ_DISH);
116
117 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
118
119 // Duplicate Joining
120 TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, "Movies"));
121
122 test_context_socket_close (dish);
123}
124
125void test_radio_dish_tcp_poll (int ipv6_)
126{
127 size_t len = MAX_SOCKET_STRING;
128 char my_endpoint[MAX_SOCKET_STRING];
129
130 void *radio = test_context_socket (ZMQ_RADIO);
131 bind_loopback (radio, ipv6_, my_endpoint, len);
132
133 void *dish = test_context_socket (ZMQ_DISH);
134
135 TEST_ASSERT_SUCCESS_ERRNO (
136 zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
137
138 // Joining
139 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies"));
140
141 // Connecting
142 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint));
143
144 msleep (SETTLE_TIME);
145
146 // This is not going to be sent as dish only subscribe to "Movies"
147 msg_send_expect_success (radio, "TV", "Friends");
148
149 // This is going to be sent to the dish
150 msg_send_expect_success (radio, "Movies", "Godfather");
151
152 // Check the correct message arrived
153 msg_recv_cmp (dish, "Movies", "Godfather");
154
155 // Join group during connection optvallen
156 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
157
158 zmq_sleep (1);
159
160 // This should arrive now as we joined the group
161 msg_send_expect_success (radio, "TV", "Friends");
162
163 // Check the correct message arrived
164 msg_recv_cmp (dish, "TV", "Friends");
165
166 // Leaving group
167 TEST_ASSERT_SUCCESS_ERRNO (zmq_leave (dish, "TV"));
168
169 zmq_sleep (1);
170
171 // This is not going to be sent as dish only subscribe to "Movies"
172 msg_send_expect_success (radio, "TV", "Friends");
173
174 // This is going to be sent to the dish
175 msg_send_expect_success (radio, "Movies", "Godfather");
176
177 // test zmq_poll with dish
178 zmq_pollitem_t items[] = {
179 {radio, 0, ZMQ_POLLIN, 0}, // read publications
180 {dish, 0, ZMQ_POLLIN, 0}, // read subscriptions
181 };
182 int rc = zmq_poll (items, 2, 2000);
183 TEST_ASSERT_EQUAL_INT (1, rc);
184 TEST_ASSERT_EQUAL_INT (ZMQ_POLLIN, items[1].revents);
185
186 // Check the correct message arrived
187 msg_recv_cmp (dish, "Movies", "Godfather");
188
189 test_context_socket_close (dish);
190 test_context_socket_close (radio);
191}
192MAKE_TEST_V4V6 (test_radio_dish_tcp_poll)
193
194void test_dish_connect_fails (int ipv6_)
195{
196 void *dish = test_context_socket (ZMQ_DISH);
197
198 TEST_ASSERT_SUCCESS_ERRNO (
199 zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
200
201 const char *url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";
202
203 // Connecting dish should fail
204 TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO, zmq_connect (dish, url));
205
206 test_context_socket_close (dish);
207}
208MAKE_TEST_V4V6 (test_dish_connect_fails)
209
210void test_radio_bind_fails (int ipv6_)
211{
212 void *radio = test_context_socket (ZMQ_RADIO);
213
214 TEST_ASSERT_SUCCESS_ERRNO (
215 zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
216
217 // Connecting dish should fail
218 // Bind radio should fail
219 TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO,
220 zmq_bind (radio, "udp://*:5556"));
221
222 test_context_socket_close (radio);
223}
224MAKE_TEST_V4V6 (test_radio_bind_fails)
225
226void test_radio_dish_udp (int ipv6_)
227{
228 void *radio = test_context_socket (ZMQ_RADIO);
229 void *dish = test_context_socket (ZMQ_DISH);
230
231 TEST_ASSERT_SUCCESS_ERRNO (
232 zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
233 TEST_ASSERT_SUCCESS_ERRNO (
234 zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
235
236 const char *radio_url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556";
237
238 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, "udp://*:5556"));
239 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, radio_url));
240
241 msleep (SETTLE_TIME);
242
243 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
244
245 msg_send_expect_success (radio, "TV", "Friends");
246 msg_recv_cmp (dish, "TV", "Friends");
247
248 test_context_socket_close (dish);
249 test_context_socket_close (radio);
250}
251MAKE_TEST_V4V6 (test_radio_dish_udp)
252
253#define MCAST_IPV4 "226.8.5.5"
254#define MCAST_IPV6 "ff02::7a65:726f:6df1:0a01"
255
256static const char *mcast_url (int ipv6_)
257{
258 if (ipv6_) {
259 return "udp://[" MCAST_IPV6 "]:5555";
260 }
261 return "udp://" MCAST_IPV4 ":5555";
262}
263
264// OSX uses a different name for this socket option
265#ifndef IPV6_ADD_MEMBERSHIP
266#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
267#endif
268
269union sa_u
270{
271 struct sockaddr generic;
272 struct sockaddr_in ipv4;
273 struct sockaddr_in6 ipv6;
274};
275
276// Test if multicast is available on this machine by attempting to
277// send a receive a multicast datagram
278static bool is_multicast_available (int ipv6_)
279{
280 int family = ipv6_ ? AF_INET6 : AF_INET;
281 fd_t bind_sock = retired_fd;
282 fd_t send_sock = retired_fd;
283 int port = 5555;
284 bool success = false;
285 const char *msg = "it works";
286 char buf[32];
287 union sa_u any;
288 union sa_u mcast;
289 socklen_t sl;
290 int rc;
291
292 if (ipv6_) {
293 struct sockaddr_in6 *any_ipv6 = &any.ipv6;
294 struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6;
295
296 any_ipv6->sin6_family = AF_INET6;
297 any_ipv6->sin6_port = htons (port);
298 any_ipv6->sin6_flowinfo = 0;
299 any_ipv6->sin6_scope_id = 0;
300
301 rc = test_inet_pton (AF_INET6, "::", &any_ipv6->sin6_addr);
302 if (rc == 0) {
303 goto out;
304 }
305
306 *mcast_ipv6 = *any_ipv6;
307
308 rc = test_inet_pton (AF_INET6, MCAST_IPV6, &mcast_ipv6->sin6_addr);
309 if (rc == 0) {
310 goto out;
311 }
312
313 sl = sizeof (*any_ipv6);
314 } else {
315 struct sockaddr_in *any_ipv4 = &any.ipv4;
316 struct sockaddr_in *mcast_ipv4 = &mcast.ipv4;
317
318 any_ipv4->sin_family = AF_INET;
319 any_ipv4->sin_port = htons (5555);
320
321 rc = test_inet_pton (AF_INET, "0.0.0.0", &any_ipv4->sin_addr);
322 if (rc == 0) {
323 goto out;
324 }
325
326 *mcast_ipv4 = *any_ipv4;
327
328 rc = test_inet_pton (AF_INET, MCAST_IPV4, &mcast_ipv4->sin_addr);
329 if (rc == 0) {
330 goto out;
331 }
332
333 sl = sizeof (*any_ipv4);
334 }
335
336 bind_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
337 if (bind_sock < 0) {
338 goto out;
339 }
340
341 send_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP);
342 if (bind_sock < 0) {
343 goto out;
344 }
345
346 rc = bind (bind_sock, &any.generic, sl);
347 if (rc < 0) {
348 goto out;
349 }
350
351 if (ipv6_) {
352 struct ipv6_mreq mreq;
353 struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6;
354
355 mreq.ipv6mr_multiaddr = mcast_ipv6->sin6_addr;
356 mreq.ipv6mr_interface = 0;
357
358 rc = setsockopt (bind_sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
359 as_setsockopt_opt_t (&mreq), sizeof (mreq));
360 if (rc < 0) {
361 goto out;
362 }
363
364 int loop = 1;
365 rc = setsockopt (send_sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP,
366 as_setsockopt_opt_t (&loop), sizeof (loop));
367 if (rc < 0) {
368 goto out;
369 }
370 } else {
371 struct ip_mreq mreq;
372 struct sockaddr_in *mcast_ipv4 = &mcast.ipv4;
373
374 mreq.imr_multiaddr = mcast_ipv4->sin_addr;
375 mreq.imr_interface.s_addr = htonl (INADDR_ANY);
376
377 rc = setsockopt (bind_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
378 as_setsockopt_opt_t (&mreq), sizeof (mreq));
379 if (rc < 0) {
380 goto out;
381 }
382
383 int loop = 1;
384 rc = setsockopt (send_sock, IPPROTO_IP, IP_MULTICAST_LOOP,
385 as_setsockopt_opt_t (&loop), sizeof (loop));
386 if (rc < 0) {
387 goto out;
388 }
389 }
390
391 msleep (SETTLE_TIME);
392
393 rc = sendto (send_sock, msg, static_cast<socklen_t> (strlen (msg)), 0,
394 &mcast.generic, sl);
395 if (rc < 0) {
396 goto out;
397 }
398
399 msleep (SETTLE_TIME);
400
401 rc = recvfrom (bind_sock, buf, sizeof (buf) - 1, 0, NULL, 0);
402 if (rc < 0) {
403 goto out;
404 }
405
406 buf[rc] = '\0';
407
408 success = (strcmp (msg, buf) == 0);
409
410out:
411 if (bind_sock >= 0) {
412 close (bind_sock);
413 }
414
415 if (send_sock >= 0) {
416 close (send_sock);
417 }
418
419 return success;
420}
421
422static void ignore_if_unavailable (int ipv6_)
423{
424 if (ipv6_ && !is_ipv6_available ())
425 TEST_IGNORE_MESSAGE ("No IPV6 available");
426 if (!is_multicast_available (ipv6_))
427 TEST_IGNORE_MESSAGE ("No multicast available");
428}
429
430static void test_radio_dish_mcast (int ipv6_)
431{
432 ignore_if_unavailable (ipv6_);
433
434 void *radio = test_context_socket (ZMQ_RADIO);
435 void *dish = test_context_socket (ZMQ_DISH);
436
437 TEST_ASSERT_SUCCESS_ERRNO (
438 zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
439 TEST_ASSERT_SUCCESS_ERRNO (
440 zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
441
442 const char *url = mcast_url (ipv6_);
443
444 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
445 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));
446
447 msleep (SETTLE_TIME);
448
449 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
450
451 msg_send_expect_success (radio, "TV", "Friends");
452 msg_recv_cmp (dish, "TV", "Friends");
453
454 test_context_socket_close (dish);
455 test_context_socket_close (radio);
456}
457MAKE_TEST_V4V6 (test_radio_dish_mcast)
458
459static void test_radio_dish_no_loop (int ipv6_)
460{
461#ifdef _WIN32
462 TEST_IGNORE_MESSAGE (
463 "ZMQ_MULTICAST_LOOP=false does not appear to work on Windows (TODO)");
464#endif
465 ignore_if_unavailable (ipv6_);
466
467 void *radio = test_context_socket (ZMQ_RADIO);
468 void *dish = test_context_socket (ZMQ_DISH);
469
470 TEST_ASSERT_SUCCESS_ERRNO (
471 zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int)));
472 TEST_ASSERT_SUCCESS_ERRNO (
473 zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int)));
474
475 // Disable multicast loop for radio
476 int loop = 0;
477 TEST_ASSERT_SUCCESS_ERRNO (
478 zmq_setsockopt (radio, ZMQ_MULTICAST_LOOP, &loop, sizeof (int)));
479
480 const char *url = mcast_url (ipv6_);
481
482 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url));
483 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url));
484
485 msleep (SETTLE_TIME);
486
487 TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV"));
488
489 msg_send_expect_success (radio, "TV", "Friends");
490
491 // Looping is disabled, we shouldn't receive anything
492 msleep (SETTLE_TIME);
493
494 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (dish, NULL, 0, ZMQ_DONTWAIT));
495
496 test_context_socket_close (dish);
497 test_context_socket_close (radio);
498}
499MAKE_TEST_V4V6 (test_radio_dish_no_loop)
500
501int main (void)
502{
503 setup_test_environment ();
504
505 UNITY_BEGIN ();
506 RUN_TEST (test_leave_unjoined_fails);
507 RUN_TEST (test_join_too_long_fails);
508 RUN_TEST (test_join_twice_fails);
509 RUN_TEST (test_radio_bind_fails_ipv4);
510 RUN_TEST (test_radio_bind_fails_ipv6);
511 RUN_TEST (test_dish_connect_fails_ipv4);
512 RUN_TEST (test_dish_connect_fails_ipv6);
513 RUN_TEST (test_radio_dish_tcp_poll_ipv4);
514 RUN_TEST (test_radio_dish_tcp_poll_ipv6);
515 RUN_TEST (test_radio_dish_udp_ipv4);
516 RUN_TEST (test_radio_dish_udp_ipv6);
517
518 RUN_TEST (test_radio_dish_mcast_ipv4);
519 RUN_TEST (test_radio_dish_no_loop_ipv4);
520
521 RUN_TEST (test_radio_dish_mcast_ipv6);
522 RUN_TEST (test_radio_dish_no_loop_ipv6);
523
524 return UNITY_END ();
525}
526