1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "testutil.hpp" |
31 | #include "testutil_unity.hpp" |
32 | |
33 | SETUP_TEARDOWN_TESTCONTEXT |
34 | |
35 | static void pusher (void * /*unused*/) |
36 | { |
37 | // Connect first |
38 | // do not use test_context_socket here, as it is not thread-safe |
39 | void *connect_socket = zmq_socket (get_test_context (), ZMQ_PAIR); |
40 | |
41 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://sink" )); |
42 | |
43 | // Queue up some data |
44 | send_string_expect_success (connect_socket, "foobar" , 0); |
45 | |
46 | // Cleanup |
47 | TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket)); |
48 | } |
49 | |
50 | static void simult_conn (void *endpt_) |
51 | { |
52 | // Pull out arguments - endpoint string |
53 | const char *endpt = static_cast<const char *> (endpt_); |
54 | |
55 | // Connect |
56 | // do not use test_context_socket here, as it is not thread-safe |
57 | void *connect_socket = zmq_socket (get_test_context (), ZMQ_SUB); |
58 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, endpt)); |
59 | |
60 | // Cleanup |
61 | TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket)); |
62 | } |
63 | |
64 | static void simult_bind (void *endpt_) |
65 | { |
66 | // Pull out arguments - context followed by endpoint string |
67 | const char *endpt = static_cast<const char *> (endpt_); |
68 | |
69 | // Bind |
70 | // do not use test_context_socket here, as it is not thread-safe |
71 | void *bind_socket = zmq_socket (get_test_context (), ZMQ_PUB); |
72 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, endpt)); |
73 | |
74 | // Cleanup |
75 | TEST_ASSERT_SUCCESS_ERRNO (zmq_close (bind_socket)); |
76 | } |
77 | |
78 | void test_bind_before_connect () |
79 | { |
80 | // Bind first |
81 | void *bind_socket = test_context_socket (ZMQ_PAIR); |
82 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://bbc" )); |
83 | |
84 | // Now connect |
85 | void *connect_socket = test_context_socket (ZMQ_PAIR); |
86 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://bbc" )); |
87 | |
88 | // Queue up some data |
89 | send_string_expect_success (connect_socket, "foobar" , 0); |
90 | |
91 | // Read pending message |
92 | recv_string_expect_success (bind_socket, "foobar" , 0); |
93 | |
94 | // Cleanup |
95 | test_context_socket_close (connect_socket); |
96 | test_context_socket_close (bind_socket); |
97 | } |
98 | |
99 | void test_connect_before_bind () |
100 | { |
101 | // Connect first |
102 | void *connect_socket = test_context_socket (ZMQ_PAIR); |
103 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb" )); |
104 | |
105 | // Queue up some data |
106 | send_string_expect_success (connect_socket, "foobar" , 0); |
107 | |
108 | // Now bind |
109 | void *bind_socket = test_context_socket (ZMQ_PAIR); |
110 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbb" )); |
111 | |
112 | // Read pending message |
113 | recv_string_expect_success (bind_socket, "foobar" , 0); |
114 | |
115 | // Cleanup |
116 | test_context_socket_close (connect_socket); |
117 | test_context_socket_close (bind_socket); |
118 | } |
119 | |
120 | void test_connect_before_bind_pub_sub () |
121 | { |
122 | // Connect first |
123 | void *connect_socket = test_context_socket (ZMQ_PUB); |
124 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbbps" )); |
125 | |
126 | // Queue up some data, this will be dropped |
127 | send_string_expect_success (connect_socket, "before" , 0); |
128 | |
129 | // Now bind |
130 | void *bind_socket = test_context_socket (ZMQ_SUB); |
131 | TEST_ASSERT_SUCCESS_ERRNO ( |
132 | zmq_setsockopt (bind_socket, ZMQ_SUBSCRIBE, "" , 0)); |
133 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbbps" )); |
134 | |
135 | // Wait for pub-sub connection to happen |
136 | msleep (SETTLE_TIME); |
137 | |
138 | // Queue up some data, this not will be dropped |
139 | send_string_expect_success (connect_socket, "after" , 0); |
140 | |
141 | // Read pending message |
142 | recv_string_expect_success (bind_socket, "after" , 0); |
143 | |
144 | // Cleanup |
145 | test_context_socket_close (connect_socket); |
146 | test_context_socket_close (bind_socket); |
147 | } |
148 | |
149 | void test_connect_before_bind_ctx_term () |
150 | { |
151 | for (int i = 0; i < 20; ++i) { |
152 | // Connect first |
153 | void *connect_socket = test_context_socket (ZMQ_ROUTER); |
154 | |
155 | char ep[32]; |
156 | sprintf (ep, "inproc://cbbrr%d" , i); |
157 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, ep)); |
158 | |
159 | // Cleanup |
160 | test_context_socket_close (connect_socket); |
161 | } |
162 | } |
163 | |
164 | void test_multiple_connects () |
165 | { |
166 | const unsigned int no_of_connects = 10; |
167 | |
168 | void *connect_socket[no_of_connects]; |
169 | |
170 | // Connect first |
171 | for (unsigned int i = 0; i < no_of_connects; ++i) { |
172 | connect_socket[i] = test_context_socket (ZMQ_PUSH); |
173 | TEST_ASSERT_SUCCESS_ERRNO ( |
174 | zmq_connect (connect_socket[i], "inproc://multiple" )); |
175 | |
176 | // Queue up some data |
177 | send_string_expect_success (connect_socket[i], "foobar" , 0); |
178 | } |
179 | |
180 | // Now bind |
181 | void *bind_socket = test_context_socket (ZMQ_PULL); |
182 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://multiple" )); |
183 | |
184 | for (unsigned int i = 0; i < no_of_connects; ++i) { |
185 | recv_string_expect_success (bind_socket, "foobar" , 0); |
186 | } |
187 | |
188 | // Cleanup |
189 | for (unsigned int i = 0; i < no_of_connects; ++i) { |
190 | test_context_socket_close (connect_socket[i]); |
191 | } |
192 | |
193 | test_context_socket_close (bind_socket); |
194 | } |
195 | |
196 | void test_multiple_threads () |
197 | { |
198 | const unsigned int no_of_threads = 30; |
199 | |
200 | void *threads[no_of_threads]; |
201 | |
202 | // Connect first |
203 | for (unsigned int i = 0; i < no_of_threads; ++i) { |
204 | threads[i] = zmq_threadstart (&pusher, NULL); |
205 | } |
206 | |
207 | // Now bind |
208 | void *bind_socket = test_context_socket (ZMQ_PULL); |
209 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://sink" )); |
210 | |
211 | for (unsigned int i = 0; i < no_of_threads; ++i) { |
212 | // Read pending message |
213 | recv_string_expect_success (bind_socket, "foobar" , 0); |
214 | } |
215 | |
216 | // Cleanup |
217 | for (unsigned int i = 0; i < no_of_threads; ++i) { |
218 | zmq_threadclose (threads[i]); |
219 | } |
220 | |
221 | test_context_socket_close (bind_socket); |
222 | } |
223 | |
224 | void test_simultaneous_connect_bind_threads () |
225 | { |
226 | const unsigned int no_of_times = 50; |
227 | void *threads[no_of_times * 2]; |
228 | void *thr_args[no_of_times]; |
229 | char endpts[no_of_times][20]; |
230 | |
231 | // Set up thread arguments: context followed by endpoint string |
232 | for (unsigned int i = 0; i < no_of_times; ++i) { |
233 | thr_args[i] = (void *) endpts[i]; |
234 | sprintf (endpts[i], "inproc://foo_%d" , i); |
235 | } |
236 | |
237 | // Spawn all threads as simultaneously as possible |
238 | for (unsigned int i = 0; i < no_of_times; ++i) { |
239 | threads[i * 2 + 0] = zmq_threadstart (&simult_conn, thr_args[i]); |
240 | threads[i * 2 + 1] = zmq_threadstart (&simult_bind, thr_args[i]); |
241 | } |
242 | |
243 | // Close all threads |
244 | for (unsigned int i = 0; i < no_of_times; ++i) { |
245 | zmq_threadclose (threads[i * 2 + 0]); |
246 | zmq_threadclose (threads[i * 2 + 1]); |
247 | } |
248 | } |
249 | |
250 | void test_routing_id () |
251 | { |
252 | // Create the infrastructure |
253 | void *sc = test_context_socket (ZMQ_DEALER); |
254 | |
255 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://routing_id" )); |
256 | |
257 | void *sb = test_context_socket (ZMQ_ROUTER); |
258 | |
259 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://routing_id" )); |
260 | |
261 | // Send 2-part message. |
262 | TEST_ASSERT_EQUAL_INT ( |
263 | 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "A" , 1, ZMQ_SNDMORE))); |
264 | TEST_ASSERT_EQUAL_INT ( |
265 | 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "B" , 1, 0))); |
266 | |
267 | // Routing id comes first. |
268 | zmq_msg_t msg; |
269 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); |
270 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)); |
271 | TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg)); |
272 | |
273 | // Then the first part of the message body. |
274 | TEST_ASSERT_EQUAL_INT ( |
275 | 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0))); |
276 | TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg)); |
277 | |
278 | // And finally, the second part of the message body. |
279 | TEST_ASSERT_EQUAL_INT ( |
280 | 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0))); |
281 | TEST_ASSERT_EQUAL_INT (0, zmq_msg_more (&msg)); |
282 | |
283 | // Deallocate the infrastructure. |
284 | test_context_socket_close (sc); |
285 | test_context_socket_close (sb); |
286 | } |
287 | |
288 | void test_connect_only () |
289 | { |
290 | void *connect_socket = test_context_socket (ZMQ_PUSH); |
291 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://a" )); |
292 | |
293 | test_context_socket_close (connect_socket); |
294 | } |
295 | |
296 | |
297 | void test_unbind () |
298 | { |
299 | // Bind and unbind socket 1 |
300 | void *bind_socket1 = test_context_socket (ZMQ_PAIR); |
301 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket1, "inproc://unbind" )); |
302 | TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (bind_socket1, "inproc://unbind" )); |
303 | |
304 | // Bind socket 2 |
305 | void *bind_socket2 = test_context_socket (ZMQ_PAIR); |
306 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket2, "inproc://unbind" )); |
307 | |
308 | // Now connect |
309 | void *connect_socket = test_context_socket (ZMQ_PAIR); |
310 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://unbind" )); |
311 | |
312 | // Queue up some data |
313 | send_string_expect_success (connect_socket, "foobar" , 0); |
314 | |
315 | // Read pending message |
316 | recv_string_expect_success (bind_socket2, "foobar" , 0); |
317 | |
318 | // Cleanup |
319 | test_context_socket_close (connect_socket); |
320 | test_context_socket_close (bind_socket1); |
321 | test_context_socket_close (bind_socket2); |
322 | } |
323 | |
324 | void test_shutdown_during_pend () |
325 | { |
326 | // Connect first |
327 | void *connect_socket = test_context_socket (ZMQ_PAIR); |
328 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb" )); |
329 | |
330 | zmq_ctx_shutdown (get_test_context ()); |
331 | |
332 | // Cleanup |
333 | test_context_socket_close (connect_socket); |
334 | } |
335 | |
336 | int main (void) |
337 | { |
338 | setup_test_environment (); |
339 | |
340 | UNITY_BEGIN (); |
341 | RUN_TEST (test_bind_before_connect); |
342 | RUN_TEST (test_connect_before_bind); |
343 | RUN_TEST (test_connect_before_bind_pub_sub); |
344 | RUN_TEST (test_connect_before_bind_ctx_term); |
345 | RUN_TEST (test_multiple_connects); |
346 | RUN_TEST (test_multiple_threads); |
347 | RUN_TEST (test_simultaneous_connect_bind_threads); |
348 | RUN_TEST (test_routing_id); |
349 | RUN_TEST (test_connect_only); |
350 | RUN_TEST (test_unbind); |
351 | RUN_TEST (test_shutdown_during_pend); |
352 | return UNITY_END (); |
353 | } |
354 | |