1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33#include <stdlib.h>
34#include <string.h>
35
36SETUP_TEARDOWN_TESTCONTEXT
37
38// SHALL receive incoming messages from its peers using a fair-queuing
39// strategy.
40void test_fair_queue_in (const char *bind_address_)
41{
42 char connect_address[MAX_SOCKET_STRING];
43 void *receiver = test_context_socket (ZMQ_ROUTER);
44
45 int timeout = 250;
46 TEST_ASSERT_SUCCESS_ERRNO (
47 zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
48
49 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (receiver, bind_address_));
50 size_t len = MAX_SOCKET_STRING;
51 TEST_ASSERT_SUCCESS_ERRNO (
52 zmq_getsockopt (receiver, ZMQ_LAST_ENDPOINT, connect_address, &len));
53
54 const unsigned char services = 5;
55 void *senders[services];
56 for (unsigned char peer = 0; peer < services; ++peer) {
57 senders[peer] = test_context_socket (ZMQ_DEALER);
58
59 TEST_ASSERT_SUCCESS_ERRNO (
60 zmq_setsockopt (senders[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
61
62 char *str = strdup ("A");
63 str[0] += peer;
64 TEST_ASSERT_SUCCESS_ERRNO (
65 zmq_setsockopt (senders[peer], ZMQ_ROUTING_ID, str, 2));
66 free (str);
67
68 TEST_ASSERT_SUCCESS_ERRNO (
69 zmq_connect (senders[peer], connect_address));
70 }
71
72 msleep (SETTLE_TIME);
73
74 zmq_msg_t msg;
75 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
76
77 s_send_seq (senders[0], "M", SEQ_END);
78 s_recv_seq (receiver, "A", "M", SEQ_END);
79
80 s_send_seq (senders[0], "M", SEQ_END);
81 s_recv_seq (receiver, "A", "M", SEQ_END);
82
83 int sum = 0;
84
85 // send N requests
86 for (unsigned char peer = 0; peer < services; ++peer) {
87 s_send_seq (senders[peer], "M", SEQ_END);
88 sum += 'A' + peer;
89 }
90
91 TEST_ASSERT_EQUAL_INT (services * 'A' + services * (services - 1) / 2, sum);
92
93 // handle N requests
94 for (unsigned char peer = 0; peer < services; ++peer) {
95 TEST_ASSERT_EQUAL_INT (
96 2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, receiver, 0)));
97 const char *id = static_cast<const char *> (zmq_msg_data (&msg));
98 sum -= id[0];
99
100 s_recv_seq (receiver, "M", SEQ_END);
101 }
102
103 TEST_ASSERT_EQUAL_INT (0, sum);
104
105 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
106
107 test_context_socket_close_zero_linger (receiver);
108
109 for (size_t peer = 0; peer < services; ++peer)
110 test_context_socket_close_zero_linger (senders[peer]);
111
112 // Wait for disconnects.
113 msleep (SETTLE_TIME);
114}
115
116// SHALL create a double queue when a peer connects to it. If this peer
117// disconnects, the ROUTER socket SHALL destroy its double queue and SHALL
118// discard any messages it contains.
119void test_destroy_queue_on_disconnect (const char *bind_address_)
120{
121 void *a = test_context_socket (ZMQ_ROUTER);
122
123 int enabled = 1;
124 TEST_ASSERT_SUCCESS_ERRNO (
125 zmq_setsockopt (a, ZMQ_ROUTER_MANDATORY, &enabled, sizeof (enabled)));
126
127 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (a, bind_address_));
128 size_t len = MAX_SOCKET_STRING;
129 char connect_address[MAX_SOCKET_STRING];
130 TEST_ASSERT_SUCCESS_ERRNO (
131 zmq_getsockopt (a, ZMQ_LAST_ENDPOINT, connect_address, &len));
132
133 void *b = test_context_socket (ZMQ_DEALER);
134
135 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (b, ZMQ_ROUTING_ID, "B", 2));
136
137 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));
138
139 // Wait for connection.
140 msleep (SETTLE_TIME);
141
142 // Send a message in both directions
143 s_send_seq (a, "B", "ABC", SEQ_END);
144 s_send_seq (b, "DEF", SEQ_END);
145
146 TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (b, connect_address));
147
148 // Disconnect may take time and need command processing.
149 zmq_pollitem_t poller[2] = {{a, 0, 0, 0}, {b, 0, 0, 0}};
150 TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100));
151 TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100));
152
153 // No messages should be available, sending should fail.
154 zmq_msg_t msg;
155 zmq_msg_init (&msg);
156
157 TEST_ASSERT_FAILURE_ERRNO (
158 EHOSTUNREACH, zmq_send (a, "B", 2, ZMQ_SNDMORE | ZMQ_DONTWAIT));
159
160 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, a, ZMQ_DONTWAIT));
161
162 // After a reconnect of B, the messages should still be gone
163 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));
164
165 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, a, ZMQ_DONTWAIT));
166
167 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT));
168
169 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
170
171 test_context_socket_close_zero_linger (a);
172 test_context_socket_close_zero_linger (b);
173
174 // Wait for disconnects.
175 msleep (SETTLE_TIME);
176}
177
178#define TEST_SUITE(name, bind_address) \
179 void test_fair_queue_in_##name () { test_fair_queue_in (bind_address); } \
180 void test_destroy_queue_on_disconnect_##name () \
181 { \
182 test_destroy_queue_on_disconnect (bind_address); \
183 }
184
185TEST_SUITE (inproc, "inproc://a")
186TEST_SUITE (tcp, "tcp://127.0.0.1:*")
187
188int main ()
189{
190 setup_test_environment ();
191
192 UNITY_BEGIN ();
193 RUN_TEST (test_fair_queue_in_tcp);
194 RUN_TEST (test_fair_queue_in_inproc);
195 // TODO commented out until libzmq implements this properly
196 // RUN_TEST (test_destroy_queue_on_disconnect_tcp);
197 // RUN_TEST (test_destroy_queue_on_disconnect_inproc);
198 return UNITY_END ();
199}
200