1 | /* |
2 | Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "testutil.hpp" |
31 | #include "testutil_unity.hpp" |
32 | |
33 | #include <string.h> |
34 | |
35 | // NOTE: on OSX the endpoint returned by ZMQ_LAST_ENDPOINT may be quite long, |
36 | // ensure we have extra space for that: |
37 | #define SOCKET_STRING_LEN (MAX_SOCKET_STRING * 4) |
38 | |
39 | SETUP_TEARDOWN_TESTCONTEXT |
40 | |
41 | int test_defaults (int send_hwm_, int msg_cnt_, const char *endpoint_) |
42 | { |
43 | char pub_endpoint[SOCKET_STRING_LEN]; |
44 | |
45 | // Set up and bind XPUB socket |
46 | void *pub_socket = test_context_socket (ZMQ_XPUB); |
47 | test_bind (pub_socket, endpoint_, pub_endpoint, sizeof pub_endpoint); |
48 | |
49 | // Set up and connect SUB socket |
50 | void *sub_socket = test_context_socket (ZMQ_SUB); |
51 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, pub_endpoint)); |
52 | |
53 | //set a hwm on publisher |
54 | TEST_ASSERT_SUCCESS_ERRNO ( |
55 | zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm_, sizeof (send_hwm_))); |
56 | TEST_ASSERT_SUCCESS_ERRNO ( |
57 | zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0)); |
58 | |
59 | // Wait before starting TX operations till 1 subscriber has subscribed |
60 | // (in this test there's 1 subscriber only) |
61 | const char subscription_to_all_topics[] = {1, 0}; |
62 | recv_string_expect_success (pub_socket, subscription_to_all_topics, 0); |
63 | |
64 | // Send until we reach "mute" state |
65 | int send_count = 0; |
66 | while (send_count < msg_cnt_ |
67 | && zmq_send (pub_socket, "test message" , 13, ZMQ_DONTWAIT) == 13) |
68 | ++send_count; |
69 | |
70 | TEST_ASSERT_EQUAL_INT (send_hwm_, send_count); |
71 | msleep (SETTLE_TIME); |
72 | |
73 | // Now receive all sent messages |
74 | int recv_count = 0; |
75 | char dummybuff[64]; |
76 | while (13 == zmq_recv (sub_socket, &dummybuff, 64, ZMQ_DONTWAIT)) { |
77 | ++recv_count; |
78 | } |
79 | |
80 | TEST_ASSERT_EQUAL_INT (send_hwm_, recv_count); |
81 | |
82 | // Clean up |
83 | test_context_socket_close (sub_socket); |
84 | test_context_socket_close (pub_socket); |
85 | |
86 | return recv_count; |
87 | } |
88 | |
89 | int receive (void *socket_, int *is_termination_) |
90 | { |
91 | int recv_count = 0; |
92 | *is_termination_ = 0; |
93 | |
94 | // Now receive all sent messages |
95 | char buffer[255]; |
96 | int len; |
97 | while ((len = zmq_recv (socket_, buffer, sizeof (buffer), 0)) >= 0) { |
98 | ++recv_count; |
99 | |
100 | if (len == 3 && strncmp (buffer, "end" , len) == 0) { |
101 | *is_termination_ = 1; |
102 | return recv_count; |
103 | } |
104 | } |
105 | |
106 | return recv_count; |
107 | } |
108 | |
109 | int test_blocking (int send_hwm_, int msg_cnt_, const char *endpoint_) |
110 | { |
111 | char pub_endpoint[SOCKET_STRING_LEN]; |
112 | |
113 | // Set up bind socket |
114 | void *pub_socket = test_context_socket (ZMQ_XPUB); |
115 | test_bind (pub_socket, endpoint_, pub_endpoint, sizeof pub_endpoint); |
116 | |
117 | // Set up connect socket |
118 | void *sub_socket = test_context_socket (ZMQ_SUB); |
119 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, pub_endpoint)); |
120 | |
121 | //set a hwm on publisher |
122 | TEST_ASSERT_SUCCESS_ERRNO ( |
123 | zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm_, sizeof (send_hwm_))); |
124 | int wait = 1; |
125 | TEST_ASSERT_SUCCESS_ERRNO ( |
126 | zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof (wait))); |
127 | int timeout_ms = 10; |
128 | TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( |
129 | sub_socket, ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms))); |
130 | TEST_ASSERT_SUCCESS_ERRNO ( |
131 | zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0)); |
132 | |
133 | // Wait before starting TX operations till 1 subscriber has subscribed |
134 | // (in this test there's 1 subscriber only) |
135 | const uint8_t subscription_to_all_topics[] = {1}; |
136 | recv_array_expect_success (pub_socket, subscription_to_all_topics, 0); |
137 | |
138 | // Send until we block |
139 | int send_count = 0; |
140 | int recv_count = 0; |
141 | int blocked_count = 0; |
142 | int is_termination = 0; |
143 | while (send_count < msg_cnt_) { |
144 | const int rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT); |
145 | if (rc == 0) { |
146 | ++send_count; |
147 | } else if (-1 == rc) { |
148 | // if the PUB socket blocks due to HWM, errno should be EAGAIN: |
149 | blocked_count++; |
150 | TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1); |
151 | recv_count += receive (sub_socket, &is_termination); |
152 | } |
153 | } |
154 | |
155 | // if send_hwm_ < msg_cnt_, we should block at least once: |
156 | char counts_string[128]; |
157 | snprintf (counts_string, sizeof counts_string - 1, |
158 | "sent = %i, received = %i" , send_count, recv_count); |
159 | TEST_ASSERT_GREATER_THAN_INT_MESSAGE (0, blocked_count, counts_string); |
160 | |
161 | // dequeue SUB socket again, to make sure XPUB has space to send the termination message |
162 | recv_count += receive (sub_socket, &is_termination); |
163 | |
164 | // send termination message |
165 | send_string_expect_success (pub_socket, "end" , 0); |
166 | |
167 | // now block on the SUB side till we get the termination message |
168 | while (is_termination == 0) |
169 | recv_count += receive (sub_socket, &is_termination); |
170 | |
171 | // remove termination message from the count: |
172 | recv_count--; |
173 | |
174 | TEST_ASSERT_EQUAL_INT (send_count, recv_count); |
175 | |
176 | // Clean up |
177 | test_context_socket_close (sub_socket); |
178 | test_context_socket_close (pub_socket); |
179 | |
180 | return recv_count; |
181 | } |
182 | |
183 | // hwm should apply to the messages that have already been received |
184 | // with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100 |
185 | void test_reset_hwm () |
186 | { |
187 | const int first_count = 9999; |
188 | const int second_count = 1100; |
189 | int hwm = 11024; |
190 | char my_endpoint[SOCKET_STRING_LEN]; |
191 | |
192 | // Set up bind socket |
193 | void *pub_socket = test_context_socket (ZMQ_PUB); |
194 | TEST_ASSERT_SUCCESS_ERRNO ( |
195 | zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm))); |
196 | bind_loopback_ipv4 (pub_socket, my_endpoint, MAX_SOCKET_STRING); |
197 | |
198 | // Set up connect socket |
199 | void *sub_socket = test_context_socket (ZMQ_SUB); |
200 | TEST_ASSERT_SUCCESS_ERRNO ( |
201 | zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm))); |
202 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, my_endpoint)); |
203 | TEST_ASSERT_SUCCESS_ERRNO ( |
204 | zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0)); |
205 | |
206 | msleep (SETTLE_TIME); |
207 | |
208 | // Send messages |
209 | int send_count = 0; |
210 | while (send_count < first_count |
211 | && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0) |
212 | ++send_count; |
213 | TEST_ASSERT_EQUAL_INT (first_count, send_count); |
214 | |
215 | msleep (SETTLE_TIME); |
216 | |
217 | // Now receive all sent messages |
218 | int recv_count = 0; |
219 | while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) { |
220 | ++recv_count; |
221 | } |
222 | TEST_ASSERT_EQUAL_INT (first_count, recv_count); |
223 | |
224 | msleep (SETTLE_TIME); |
225 | |
226 | // Send messages |
227 | send_count = 0; |
228 | while (send_count < second_count |
229 | && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0) |
230 | ++send_count; |
231 | TEST_ASSERT_EQUAL_INT (second_count, send_count); |
232 | |
233 | msleep (SETTLE_TIME); |
234 | |
235 | // Now receive all sent messages |
236 | recv_count = 0; |
237 | while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) { |
238 | ++recv_count; |
239 | } |
240 | TEST_ASSERT_EQUAL_INT (second_count, recv_count); |
241 | |
242 | // Clean up |
243 | test_context_socket_close (sub_socket); |
244 | test_context_socket_close (pub_socket); |
245 | } |
246 | |
247 | void test_defaults_large (const char *bind_endpoint_) |
248 | { |
249 | // send 1000 msg on hwm 1000, receive 1000 |
250 | TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000, bind_endpoint_)); |
251 | } |
252 | |
253 | void test_defaults_small (const char *bind_endpoint_) |
254 | { |
255 | // send 1000 msg on hwm 100, receive 100 |
256 | TEST_ASSERT_EQUAL_INT (100, test_defaults (100, 100, bind_endpoint_)); |
257 | } |
258 | |
259 | void test_blocking (const char *bind_endpoint_) |
260 | { |
261 | // send 6000 msg on hwm 2000, drops above hwm, only receive hwm: |
262 | TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000, bind_endpoint_)); |
263 | } |
264 | |
265 | #define DEFINE_REGULAR_TEST_CASES(name, bind_endpoint) \ |
266 | void test_defaults_large_##name () \ |
267 | { \ |
268 | test_defaults_large (bind_endpoint); \ |
269 | } \ |
270 | \ |
271 | void test_defaults_small_##name () \ |
272 | { \ |
273 | test_defaults_small (bind_endpoint); \ |
274 | } \ |
275 | \ |
276 | void test_blocking_##name () { test_blocking (bind_endpoint); } |
277 | |
278 | #define RUN_REGULAR_TEST_CASES(name) \ |
279 | RUN_TEST (test_defaults_large_##name); \ |
280 | RUN_TEST (test_defaults_small_##name); \ |
281 | RUN_TEST (test_blocking_##name) |
282 | |
283 | DEFINE_REGULAR_TEST_CASES (tcp, "tcp://127.0.0.1:*" ) |
284 | DEFINE_REGULAR_TEST_CASES (inproc, "inproc://a" ) |
285 | |
286 | #if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_GNU) |
287 | DEFINE_REGULAR_TEST_CASES (ipc, "ipc://*" ) |
288 | #endif |
289 | |
290 | int main () |
291 | { |
292 | setup_test_environment (); |
293 | |
294 | UNITY_BEGIN (); |
295 | |
296 | RUN_REGULAR_TEST_CASES (tcp); |
297 | RUN_REGULAR_TEST_CASES (inproc); |
298 | |
299 | #if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_GNU) |
300 | RUN_REGULAR_TEST_CASES (ipc); |
301 | #endif |
302 | RUN_TEST (test_reset_hwm); |
303 | return UNITY_END (); |
304 | } |
305 | |