1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "testutil.hpp" |
31 | #include "testutil_unity.hpp" |
32 | |
33 | #include <string.h> |
34 | |
35 | #ifndef _WIN32 |
36 | #include <sys/socket.h> |
37 | #include <netinet/in.h> |
38 | #include <arpa/inet.h> |
39 | #include <unistd.h> |
40 | #endif |
41 | |
42 | // Helper macro to define the v4/v6 function pairs |
43 | #define MAKE_TEST_V4V6(_test) \ |
44 | static void _test##_ipv4 () { _test (false); } \ |
45 | \ |
46 | static void _test##_ipv6 () \ |
47 | { \ |
48 | if (!is_ipv6_available ()) { \ |
49 | TEST_IGNORE_MESSAGE ("ipv6 is not available"); \ |
50 | } \ |
51 | _test (true); \ |
52 | } |
53 | |
54 | SETUP_TEARDOWN_TESTCONTEXT |
55 | |
56 | void msg_send_expect_success (void *s_, const char *group_, const char *body_) |
57 | { |
58 | zmq_msg_t msg; |
59 | const size_t len = strlen (body_); |
60 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, len)); |
61 | |
62 | memcpy (zmq_msg_data (&msg), body_, len); |
63 | |
64 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_set_group (&msg, group_)); |
65 | |
66 | int rc = zmq_msg_send (&msg, s_, 0); |
67 | TEST_ASSERT_EQUAL_INT ((int) len, rc); |
68 | |
69 | // TODO isn't the msg closed by zmq_msg_send? |
70 | zmq_msg_close (&msg); |
71 | } |
72 | |
73 | void msg_recv_cmp (void *s_, const char *group_, const char *body_) |
74 | { |
75 | zmq_msg_t msg; |
76 | const size_t len = strlen (body_); |
77 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); |
78 | |
79 | int recv_rc = TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, s_, 0)); |
80 | TEST_ASSERT_EQUAL_INT (len, recv_rc); |
81 | |
82 | TEST_ASSERT_EQUAL_STRING (group_, zmq_msg_group (&msg)); |
83 | |
84 | TEST_ASSERT_EQUAL_STRING_LEN (body_, zmq_msg_data (&msg), len); |
85 | |
86 | zmq_msg_close (&msg); |
87 | } |
88 | |
89 | void test_leave_unjoined_fails () |
90 | { |
91 | void *dish = test_context_socket (ZMQ_DISH); |
92 | |
93 | // Leaving a group which we didn't join |
94 | TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_leave (dish, "Movies" )); |
95 | |
96 | test_context_socket_close (dish); |
97 | } |
98 | |
99 | void test_join_too_long_fails () |
100 | { |
101 | void *dish = test_context_socket (ZMQ_DISH); |
102 | |
103 | // Joining too long group |
104 | char too_long_group[ZMQ_GROUP_MAX_LENGTH + 2]; |
105 | for (int index = 0; index < ZMQ_GROUP_MAX_LENGTH + 2; index++) |
106 | too_long_group[index] = 'A'; |
107 | too_long_group[ZMQ_GROUP_MAX_LENGTH + 1] = '\0'; |
108 | TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, too_long_group)); |
109 | |
110 | test_context_socket_close (dish); |
111 | } |
112 | |
113 | void test_join_twice_fails () |
114 | { |
115 | void *dish = test_context_socket (ZMQ_DISH); |
116 | |
117 | TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies" )); |
118 | |
119 | // Duplicate Joining |
120 | TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_join (dish, "Movies" )); |
121 | |
122 | test_context_socket_close (dish); |
123 | } |
124 | |
125 | void test_radio_dish_tcp_poll (int ipv6_) |
126 | { |
127 | size_t len = MAX_SOCKET_STRING; |
128 | char my_endpoint[MAX_SOCKET_STRING]; |
129 | |
130 | void *radio = test_context_socket (ZMQ_RADIO); |
131 | bind_loopback (radio, ipv6_, my_endpoint, len); |
132 | |
133 | void *dish = test_context_socket (ZMQ_DISH); |
134 | |
135 | TEST_ASSERT_SUCCESS_ERRNO ( |
136 | zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int))); |
137 | |
138 | // Joining |
139 | TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "Movies" )); |
140 | |
141 | // Connecting |
142 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dish, my_endpoint)); |
143 | |
144 | msleep (SETTLE_TIME); |
145 | |
146 | // This is not going to be sent as dish only subscribe to "Movies" |
147 | msg_send_expect_success (radio, "TV" , "Friends" ); |
148 | |
149 | // This is going to be sent to the dish |
150 | msg_send_expect_success (radio, "Movies" , "Godfather" ); |
151 | |
152 | // Check the correct message arrived |
153 | msg_recv_cmp (dish, "Movies" , "Godfather" ); |
154 | |
155 | // Join group during connection optvallen |
156 | TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV" )); |
157 | |
158 | zmq_sleep (1); |
159 | |
160 | // This should arrive now as we joined the group |
161 | msg_send_expect_success (radio, "TV" , "Friends" ); |
162 | |
163 | // Check the correct message arrived |
164 | msg_recv_cmp (dish, "TV" , "Friends" ); |
165 | |
166 | // Leaving group |
167 | TEST_ASSERT_SUCCESS_ERRNO (zmq_leave (dish, "TV" )); |
168 | |
169 | zmq_sleep (1); |
170 | |
171 | // This is not going to be sent as dish only subscribe to "Movies" |
172 | msg_send_expect_success (radio, "TV" , "Friends" ); |
173 | |
174 | // This is going to be sent to the dish |
175 | msg_send_expect_success (radio, "Movies" , "Godfather" ); |
176 | |
177 | // test zmq_poll with dish |
178 | zmq_pollitem_t items[] = { |
179 | {radio, 0, ZMQ_POLLIN, 0}, // read publications |
180 | {dish, 0, ZMQ_POLLIN, 0}, // read subscriptions |
181 | }; |
182 | int rc = zmq_poll (items, 2, 2000); |
183 | TEST_ASSERT_EQUAL_INT (1, rc); |
184 | TEST_ASSERT_EQUAL_INT (ZMQ_POLLIN, items[1].revents); |
185 | |
186 | // Check the correct message arrived |
187 | msg_recv_cmp (dish, "Movies" , "Godfather" ); |
188 | |
189 | test_context_socket_close (dish); |
190 | test_context_socket_close (radio); |
191 | } |
192 | MAKE_TEST_V4V6 (test_radio_dish_tcp_poll) |
193 | |
194 | void test_dish_connect_fails (int ipv6_) |
195 | { |
196 | void *dish = test_context_socket (ZMQ_DISH); |
197 | |
198 | TEST_ASSERT_SUCCESS_ERRNO ( |
199 | zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int))); |
200 | |
201 | const char *url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556" ; |
202 | |
203 | // Connecting dish should fail |
204 | TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO, zmq_connect (dish, url)); |
205 | |
206 | test_context_socket_close (dish); |
207 | } |
208 | MAKE_TEST_V4V6 (test_dish_connect_fails) |
209 | |
210 | void test_radio_bind_fails (int ipv6_) |
211 | { |
212 | void *radio = test_context_socket (ZMQ_RADIO); |
213 | |
214 | TEST_ASSERT_SUCCESS_ERRNO ( |
215 | zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int))); |
216 | |
217 | // Connecting dish should fail |
218 | // Bind radio should fail |
219 | TEST_ASSERT_FAILURE_ERRNO (ENOCOMPATPROTO, |
220 | zmq_bind (radio, "udp://*:5556" )); |
221 | |
222 | test_context_socket_close (radio); |
223 | } |
224 | MAKE_TEST_V4V6 (test_radio_bind_fails) |
225 | |
226 | void test_radio_dish_udp (int ipv6_) |
227 | { |
228 | void *radio = test_context_socket (ZMQ_RADIO); |
229 | void *dish = test_context_socket (ZMQ_DISH); |
230 | |
231 | TEST_ASSERT_SUCCESS_ERRNO ( |
232 | zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int))); |
233 | TEST_ASSERT_SUCCESS_ERRNO ( |
234 | zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int))); |
235 | |
236 | const char *radio_url = ipv6_ ? "udp://[::1]:5556" : "udp://127.0.0.1:5556" ; |
237 | |
238 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, "udp://*:5556" )); |
239 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, radio_url)); |
240 | |
241 | msleep (SETTLE_TIME); |
242 | |
243 | TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV" )); |
244 | |
245 | msg_send_expect_success (radio, "TV" , "Friends" ); |
246 | msg_recv_cmp (dish, "TV" , "Friends" ); |
247 | |
248 | test_context_socket_close (dish); |
249 | test_context_socket_close (radio); |
250 | } |
251 | MAKE_TEST_V4V6 (test_radio_dish_udp) |
252 | |
253 | #define MCAST_IPV4 "226.8.5.5" |
254 | #define MCAST_IPV6 "ff02::7a65:726f:6df1:0a01" |
255 | |
256 | static const char *mcast_url (int ipv6_) |
257 | { |
258 | if (ipv6_) { |
259 | return "udp://[" MCAST_IPV6 "]:5555" ; |
260 | } |
261 | return "udp://" MCAST_IPV4 ":5555" ; |
262 | } |
263 | |
264 | // OSX uses a different name for this socket option |
265 | #ifndef IPV6_ADD_MEMBERSHIP |
266 | #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP |
267 | #endif |
268 | |
269 | union sa_u |
270 | { |
271 | struct sockaddr generic; |
272 | struct sockaddr_in ipv4; |
273 | struct sockaddr_in6 ipv6; |
274 | }; |
275 | |
276 | // Test if multicast is available on this machine by attempting to |
277 | // send a receive a multicast datagram |
278 | static bool is_multicast_available (int ipv6_) |
279 | { |
280 | int family = ipv6_ ? AF_INET6 : AF_INET; |
281 | fd_t bind_sock = retired_fd; |
282 | fd_t send_sock = retired_fd; |
283 | int port = 5555; |
284 | bool success = false; |
285 | const char *msg = "it works" ; |
286 | char buf[32]; |
287 | union sa_u any; |
288 | union sa_u mcast; |
289 | socklen_t sl; |
290 | int rc; |
291 | |
292 | if (ipv6_) { |
293 | struct sockaddr_in6 *any_ipv6 = &any.ipv6; |
294 | struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6; |
295 | |
296 | any_ipv6->sin6_family = AF_INET6; |
297 | any_ipv6->sin6_port = htons (port); |
298 | any_ipv6->sin6_flowinfo = 0; |
299 | any_ipv6->sin6_scope_id = 0; |
300 | |
301 | rc = test_inet_pton (AF_INET6, "::" , &any_ipv6->sin6_addr); |
302 | if (rc == 0) { |
303 | goto out; |
304 | } |
305 | |
306 | *mcast_ipv6 = *any_ipv6; |
307 | |
308 | rc = test_inet_pton (AF_INET6, MCAST_IPV6, &mcast_ipv6->sin6_addr); |
309 | if (rc == 0) { |
310 | goto out; |
311 | } |
312 | |
313 | sl = sizeof (*any_ipv6); |
314 | } else { |
315 | struct sockaddr_in *any_ipv4 = &any.ipv4; |
316 | struct sockaddr_in *mcast_ipv4 = &mcast.ipv4; |
317 | |
318 | any_ipv4->sin_family = AF_INET; |
319 | any_ipv4->sin_port = htons (5555); |
320 | |
321 | rc = test_inet_pton (AF_INET, "0.0.0.0" , &any_ipv4->sin_addr); |
322 | if (rc == 0) { |
323 | goto out; |
324 | } |
325 | |
326 | *mcast_ipv4 = *any_ipv4; |
327 | |
328 | rc = test_inet_pton (AF_INET, MCAST_IPV4, &mcast_ipv4->sin_addr); |
329 | if (rc == 0) { |
330 | goto out; |
331 | } |
332 | |
333 | sl = sizeof (*any_ipv4); |
334 | } |
335 | |
336 | bind_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP); |
337 | if (bind_sock < 0) { |
338 | goto out; |
339 | } |
340 | |
341 | send_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP); |
342 | if (bind_sock < 0) { |
343 | goto out; |
344 | } |
345 | |
346 | rc = bind (bind_sock, &any.generic, sl); |
347 | if (rc < 0) { |
348 | goto out; |
349 | } |
350 | |
351 | if (ipv6_) { |
352 | struct ipv6_mreq mreq; |
353 | struct sockaddr_in6 *mcast_ipv6 = &mcast.ipv6; |
354 | |
355 | mreq.ipv6mr_multiaddr = mcast_ipv6->sin6_addr; |
356 | mreq.ipv6mr_interface = 0; |
357 | |
358 | rc = setsockopt (bind_sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, |
359 | as_setsockopt_opt_t (&mreq), sizeof (mreq)); |
360 | if (rc < 0) { |
361 | goto out; |
362 | } |
363 | |
364 | int loop = 1; |
365 | rc = setsockopt (send_sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, |
366 | as_setsockopt_opt_t (&loop), sizeof (loop)); |
367 | if (rc < 0) { |
368 | goto out; |
369 | } |
370 | } else { |
371 | struct ip_mreq mreq; |
372 | struct sockaddr_in *mcast_ipv4 = &mcast.ipv4; |
373 | |
374 | mreq.imr_multiaddr = mcast_ipv4->sin_addr; |
375 | mreq.imr_interface.s_addr = htonl (INADDR_ANY); |
376 | |
377 | rc = setsockopt (bind_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, |
378 | as_setsockopt_opt_t (&mreq), sizeof (mreq)); |
379 | if (rc < 0) { |
380 | goto out; |
381 | } |
382 | |
383 | int loop = 1; |
384 | rc = setsockopt (send_sock, IPPROTO_IP, IP_MULTICAST_LOOP, |
385 | as_setsockopt_opt_t (&loop), sizeof (loop)); |
386 | if (rc < 0) { |
387 | goto out; |
388 | } |
389 | } |
390 | |
391 | msleep (SETTLE_TIME); |
392 | |
393 | rc = sendto (send_sock, msg, static_cast<socklen_t> (strlen (msg)), 0, |
394 | &mcast.generic, sl); |
395 | if (rc < 0) { |
396 | goto out; |
397 | } |
398 | |
399 | msleep (SETTLE_TIME); |
400 | |
401 | rc = recvfrom (bind_sock, buf, sizeof (buf) - 1, 0, NULL, 0); |
402 | if (rc < 0) { |
403 | goto out; |
404 | } |
405 | |
406 | buf[rc] = '\0'; |
407 | |
408 | success = (strcmp (msg, buf) == 0); |
409 | |
410 | out: |
411 | if (bind_sock >= 0) { |
412 | close (bind_sock); |
413 | } |
414 | |
415 | if (send_sock >= 0) { |
416 | close (send_sock); |
417 | } |
418 | |
419 | return success; |
420 | } |
421 | |
422 | static void ignore_if_unavailable (int ipv6_) |
423 | { |
424 | if (ipv6_ && !is_ipv6_available ()) |
425 | TEST_IGNORE_MESSAGE ("No IPV6 available" ); |
426 | if (!is_multicast_available (ipv6_)) |
427 | TEST_IGNORE_MESSAGE ("No multicast available" ); |
428 | } |
429 | |
430 | static void test_radio_dish_mcast (int ipv6_) |
431 | { |
432 | ignore_if_unavailable (ipv6_); |
433 | |
434 | void *radio = test_context_socket (ZMQ_RADIO); |
435 | void *dish = test_context_socket (ZMQ_DISH); |
436 | |
437 | TEST_ASSERT_SUCCESS_ERRNO ( |
438 | zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int))); |
439 | TEST_ASSERT_SUCCESS_ERRNO ( |
440 | zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int))); |
441 | |
442 | const char *url = mcast_url (ipv6_); |
443 | |
444 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url)); |
445 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url)); |
446 | |
447 | msleep (SETTLE_TIME); |
448 | |
449 | TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV" )); |
450 | |
451 | msg_send_expect_success (radio, "TV" , "Friends" ); |
452 | msg_recv_cmp (dish, "TV" , "Friends" ); |
453 | |
454 | test_context_socket_close (dish); |
455 | test_context_socket_close (radio); |
456 | } |
457 | MAKE_TEST_V4V6 (test_radio_dish_mcast) |
458 | |
459 | static void test_radio_dish_no_loop (int ipv6_) |
460 | { |
461 | #ifdef _WIN32 |
462 | TEST_IGNORE_MESSAGE ( |
463 | "ZMQ_MULTICAST_LOOP=false does not appear to work on Windows (TODO)" ); |
464 | #endif |
465 | ignore_if_unavailable (ipv6_); |
466 | |
467 | void *radio = test_context_socket (ZMQ_RADIO); |
468 | void *dish = test_context_socket (ZMQ_DISH); |
469 | |
470 | TEST_ASSERT_SUCCESS_ERRNO ( |
471 | zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int))); |
472 | TEST_ASSERT_SUCCESS_ERRNO ( |
473 | zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int))); |
474 | |
475 | // Disable multicast loop for radio |
476 | int loop = 0; |
477 | TEST_ASSERT_SUCCESS_ERRNO ( |
478 | zmq_setsockopt (radio, ZMQ_MULTICAST_LOOP, &loop, sizeof (int))); |
479 | |
480 | const char *url = mcast_url (ipv6_); |
481 | |
482 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url)); |
483 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url)); |
484 | |
485 | msleep (SETTLE_TIME); |
486 | |
487 | TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV" )); |
488 | |
489 | msg_send_expect_success (radio, "TV" , "Friends" ); |
490 | |
491 | // Looping is disabled, we shouldn't receive anything |
492 | msleep (SETTLE_TIME); |
493 | |
494 | TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (dish, NULL, 0, ZMQ_DONTWAIT)); |
495 | |
496 | test_context_socket_close (dish); |
497 | test_context_socket_close (radio); |
498 | } |
499 | MAKE_TEST_V4V6 (test_radio_dish_no_loop) |
500 | |
501 | int main (void) |
502 | { |
503 | setup_test_environment (); |
504 | |
505 | UNITY_BEGIN (); |
506 | RUN_TEST (test_leave_unjoined_fails); |
507 | RUN_TEST (test_join_too_long_fails); |
508 | RUN_TEST (test_join_twice_fails); |
509 | RUN_TEST (test_radio_bind_fails_ipv4); |
510 | RUN_TEST (test_radio_bind_fails_ipv6); |
511 | RUN_TEST (test_dish_connect_fails_ipv4); |
512 | RUN_TEST (test_dish_connect_fails_ipv6); |
513 | RUN_TEST (test_radio_dish_tcp_poll_ipv4); |
514 | RUN_TEST (test_radio_dish_tcp_poll_ipv6); |
515 | RUN_TEST (test_radio_dish_udp_ipv4); |
516 | RUN_TEST (test_radio_dish_udp_ipv6); |
517 | |
518 | RUN_TEST (test_radio_dish_mcast_ipv4); |
519 | RUN_TEST (test_radio_dish_no_loop_ipv4); |
520 | |
521 | RUN_TEST (test_radio_dish_mcast_ipv6); |
522 | RUN_TEST (test_radio_dish_no_loop_ipv6); |
523 | |
524 | return UNITY_END (); |
525 | } |
526 | |