1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33SETUP_TEARDOWN_TESTCONTEXT
34
35static void pusher (void * /*unused*/)
36{
37 // Connect first
38 // do not use test_context_socket here, as it is not thread-safe
39 void *connect_socket = zmq_socket (get_test_context (), ZMQ_PAIR);
40
41 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://sink"));
42
43 // Queue up some data
44 send_string_expect_success (connect_socket, "foobar", 0);
45
46 // Cleanup
47 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket));
48}
49
50static void simult_conn (void *endpt_)
51{
52 // Pull out arguments - endpoint string
53 const char *endpt = static_cast<const char *> (endpt_);
54
55 // Connect
56 // do not use test_context_socket here, as it is not thread-safe
57 void *connect_socket = zmq_socket (get_test_context (), ZMQ_SUB);
58 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, endpt));
59
60 // Cleanup
61 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket));
62}
63
64static void simult_bind (void *endpt_)
65{
66 // Pull out arguments - context followed by endpoint string
67 const char *endpt = static_cast<const char *> (endpt_);
68
69 // Bind
70 // do not use test_context_socket here, as it is not thread-safe
71 void *bind_socket = zmq_socket (get_test_context (), ZMQ_PUB);
72 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, endpt));
73
74 // Cleanup
75 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (bind_socket));
76}
77
78void test_bind_before_connect ()
79{
80 // Bind first
81 void *bind_socket = test_context_socket (ZMQ_PAIR);
82 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://bbc"));
83
84 // Now connect
85 void *connect_socket = test_context_socket (ZMQ_PAIR);
86 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://bbc"));
87
88 // Queue up some data
89 send_string_expect_success (connect_socket, "foobar", 0);
90
91 // Read pending message
92 recv_string_expect_success (bind_socket, "foobar", 0);
93
94 // Cleanup
95 test_context_socket_close (connect_socket);
96 test_context_socket_close (bind_socket);
97}
98
99void test_connect_before_bind ()
100{
101 // Connect first
102 void *connect_socket = test_context_socket (ZMQ_PAIR);
103 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb"));
104
105 // Queue up some data
106 send_string_expect_success (connect_socket, "foobar", 0);
107
108 // Now bind
109 void *bind_socket = test_context_socket (ZMQ_PAIR);
110 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbb"));
111
112 // Read pending message
113 recv_string_expect_success (bind_socket, "foobar", 0);
114
115 // Cleanup
116 test_context_socket_close (connect_socket);
117 test_context_socket_close (bind_socket);
118}
119
120void test_connect_before_bind_pub_sub ()
121{
122 // Connect first
123 void *connect_socket = test_context_socket (ZMQ_PUB);
124 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbbps"));
125
126 // Queue up some data, this will be dropped
127 send_string_expect_success (connect_socket, "before", 0);
128
129 // Now bind
130 void *bind_socket = test_context_socket (ZMQ_SUB);
131 TEST_ASSERT_SUCCESS_ERRNO (
132 zmq_setsockopt (bind_socket, ZMQ_SUBSCRIBE, "", 0));
133 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbbps"));
134
135 // Wait for pub-sub connection to happen
136 msleep (SETTLE_TIME);
137
138 // Queue up some data, this not will be dropped
139 send_string_expect_success (connect_socket, "after", 0);
140
141 // Read pending message
142 recv_string_expect_success (bind_socket, "after", 0);
143
144 // Cleanup
145 test_context_socket_close (connect_socket);
146 test_context_socket_close (bind_socket);
147}
148
149void test_connect_before_bind_ctx_term ()
150{
151 for (int i = 0; i < 20; ++i) {
152 // Connect first
153 void *connect_socket = test_context_socket (ZMQ_ROUTER);
154
155 char ep[32];
156 sprintf (ep, "inproc://cbbrr%d", i);
157 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, ep));
158
159 // Cleanup
160 test_context_socket_close (connect_socket);
161 }
162}
163
164void test_multiple_connects ()
165{
166 const unsigned int no_of_connects = 10;
167
168 void *connect_socket[no_of_connects];
169
170 // Connect first
171 for (unsigned int i = 0; i < no_of_connects; ++i) {
172 connect_socket[i] = test_context_socket (ZMQ_PUSH);
173 TEST_ASSERT_SUCCESS_ERRNO (
174 zmq_connect (connect_socket[i], "inproc://multiple"));
175
176 // Queue up some data
177 send_string_expect_success (connect_socket[i], "foobar", 0);
178 }
179
180 // Now bind
181 void *bind_socket = test_context_socket (ZMQ_PULL);
182 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://multiple"));
183
184 for (unsigned int i = 0; i < no_of_connects; ++i) {
185 recv_string_expect_success (bind_socket, "foobar", 0);
186 }
187
188 // Cleanup
189 for (unsigned int i = 0; i < no_of_connects; ++i) {
190 test_context_socket_close (connect_socket[i]);
191 }
192
193 test_context_socket_close (bind_socket);
194}
195
196void test_multiple_threads ()
197{
198 const unsigned int no_of_threads = 30;
199
200 void *threads[no_of_threads];
201
202 // Connect first
203 for (unsigned int i = 0; i < no_of_threads; ++i) {
204 threads[i] = zmq_threadstart (&pusher, NULL);
205 }
206
207 // Now bind
208 void *bind_socket = test_context_socket (ZMQ_PULL);
209 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://sink"));
210
211 for (unsigned int i = 0; i < no_of_threads; ++i) {
212 // Read pending message
213 recv_string_expect_success (bind_socket, "foobar", 0);
214 }
215
216 // Cleanup
217 for (unsigned int i = 0; i < no_of_threads; ++i) {
218 zmq_threadclose (threads[i]);
219 }
220
221 test_context_socket_close (bind_socket);
222}
223
224void test_simultaneous_connect_bind_threads ()
225{
226 const unsigned int no_of_times = 50;
227 void *threads[no_of_times * 2];
228 void *thr_args[no_of_times];
229 char endpts[no_of_times][20];
230
231 // Set up thread arguments: context followed by endpoint string
232 for (unsigned int i = 0; i < no_of_times; ++i) {
233 thr_args[i] = (void *) endpts[i];
234 sprintf (endpts[i], "inproc://foo_%d", i);
235 }
236
237 // Spawn all threads as simultaneously as possible
238 for (unsigned int i = 0; i < no_of_times; ++i) {
239 threads[i * 2 + 0] = zmq_threadstart (&simult_conn, thr_args[i]);
240 threads[i * 2 + 1] = zmq_threadstart (&simult_bind, thr_args[i]);
241 }
242
243 // Close all threads
244 for (unsigned int i = 0; i < no_of_times; ++i) {
245 zmq_threadclose (threads[i * 2 + 0]);
246 zmq_threadclose (threads[i * 2 + 1]);
247 }
248}
249
250void test_routing_id ()
251{
252 // Create the infrastructure
253 void *sc = test_context_socket (ZMQ_DEALER);
254
255 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://routing_id"));
256
257 void *sb = test_context_socket (ZMQ_ROUTER);
258
259 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://routing_id"));
260
261 // Send 2-part message.
262 TEST_ASSERT_EQUAL_INT (
263 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "A", 1, ZMQ_SNDMORE)));
264 TEST_ASSERT_EQUAL_INT (
265 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "B", 1, 0)));
266
267 // Routing id comes first.
268 zmq_msg_t msg;
269 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
270 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0));
271 TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg));
272
273 // Then the first part of the message body.
274 TEST_ASSERT_EQUAL_INT (
275 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)));
276 TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg));
277
278 // And finally, the second part of the message body.
279 TEST_ASSERT_EQUAL_INT (
280 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)));
281 TEST_ASSERT_EQUAL_INT (0, zmq_msg_more (&msg));
282
283 // Deallocate the infrastructure.
284 test_context_socket_close (sc);
285 test_context_socket_close (sb);
286}
287
288void test_connect_only ()
289{
290 void *connect_socket = test_context_socket (ZMQ_PUSH);
291 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://a"));
292
293 test_context_socket_close (connect_socket);
294}
295
296
297void test_unbind ()
298{
299 // Bind and unbind socket 1
300 void *bind_socket1 = test_context_socket (ZMQ_PAIR);
301 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket1, "inproc://unbind"));
302 TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (bind_socket1, "inproc://unbind"));
303
304 // Bind socket 2
305 void *bind_socket2 = test_context_socket (ZMQ_PAIR);
306 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket2, "inproc://unbind"));
307
308 // Now connect
309 void *connect_socket = test_context_socket (ZMQ_PAIR);
310 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://unbind"));
311
312 // Queue up some data
313 send_string_expect_success (connect_socket, "foobar", 0);
314
315 // Read pending message
316 recv_string_expect_success (bind_socket2, "foobar", 0);
317
318 // Cleanup
319 test_context_socket_close (connect_socket);
320 test_context_socket_close (bind_socket1);
321 test_context_socket_close (bind_socket2);
322}
323
324void test_shutdown_during_pend ()
325{
326 // Connect first
327 void *connect_socket = test_context_socket (ZMQ_PAIR);
328 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb"));
329
330 zmq_ctx_shutdown (get_test_context ());
331
332 // Cleanup
333 test_context_socket_close (connect_socket);
334}
335
336int main (void)
337{
338 setup_test_environment ();
339
340 UNITY_BEGIN ();
341 RUN_TEST (test_bind_before_connect);
342 RUN_TEST (test_connect_before_bind);
343 RUN_TEST (test_connect_before_bind_pub_sub);
344 RUN_TEST (test_connect_before_bind_ctx_term);
345 RUN_TEST (test_multiple_connects);
346 RUN_TEST (test_multiple_threads);
347 RUN_TEST (test_simultaneous_connect_bind_threads);
348 RUN_TEST (test_routing_id);
349 RUN_TEST (test_connect_only);
350 RUN_TEST (test_unbind);
351 RUN_TEST (test_shutdown_during_pend);
352 return UNITY_END ();
353}
354