1/*
2 Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33#include <stdlib.h>
34#include <string.h>
35
36SETUP_TEARDOWN_TESTCONTEXT
37
38char connect_address[MAX_SOCKET_STRING];
39
40// PUSH: SHALL route outgoing messages to connected peers using a
41// round-robin strategy.
42void test_push_round_robin_out (const char *bind_address_)
43{
44 void *push = test_context_socket (ZMQ_PUSH);
45
46 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (push, bind_address_));
47 size_t len = MAX_SOCKET_STRING;
48 TEST_ASSERT_SUCCESS_ERRNO (
49 zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, connect_address, &len));
50
51 const size_t services = 5;
52 void *pulls[services];
53 for (size_t peer = 0; peer < services; ++peer) {
54 pulls[peer] = test_context_socket (ZMQ_PULL);
55
56 int timeout = 250;
57 TEST_ASSERT_SUCCESS_ERRNO (
58 zmq_setsockopt (pulls[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
59 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[peer], connect_address));
60 }
61
62 // Wait for connections.
63 msleep (SETTLE_TIME);
64
65 // Send 2N messages
66 for (size_t peer = 0; peer < services; ++peer)
67 s_send_seq (push, "ABC", SEQ_END);
68 for (size_t peer = 0; peer < services; ++peer)
69 s_send_seq (push, "DEF", SEQ_END);
70
71 // Expect every PULL got one of each
72 for (size_t peer = 0; peer < services; ++peer) {
73 s_recv_seq (pulls[peer], "ABC", SEQ_END);
74 s_recv_seq (pulls[peer], "DEF", SEQ_END);
75 }
76
77 test_context_socket_close_zero_linger (push);
78
79 for (size_t peer = 0; peer < services; ++peer)
80 test_context_socket_close_zero_linger (pulls[peer]);
81}
82
83// PULL: SHALL receive incoming messages from its peers using a fair-queuing
84// strategy.
85void test_pull_fair_queue_in (const char *bind_address_)
86{
87 void *pull = test_context_socket (ZMQ_PULL);
88
89 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pull, bind_address_));
90 size_t len = MAX_SOCKET_STRING;
91 TEST_ASSERT_SUCCESS_ERRNO (
92 zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, connect_address, &len));
93
94 const unsigned char services = 5;
95 void *pushs[services];
96 for (unsigned char peer = 0; peer < services; ++peer) {
97 pushs[peer] = test_context_socket (ZMQ_PUSH);
98
99 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pushs[peer], connect_address));
100 }
101
102 // Wait for connections.
103 msleep (SETTLE_TIME);
104
105 int first_half = 0;
106 int second_half = 0;
107
108 // Send 2N messages
109 for (unsigned char peer = 0; peer < services; ++peer) {
110 char *str = strdup ("A");
111
112 str[0] += peer;
113 s_send_seq (pushs[peer], str, SEQ_END);
114 first_half += str[0];
115
116 str[0] += services;
117 s_send_seq (pushs[peer], str, SEQ_END);
118 second_half += str[0];
119
120 free (str);
121 }
122
123 // Wait for data.
124 msleep (SETTLE_TIME);
125
126 zmq_msg_t msg;
127 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
128
129 // Expect to pull one from each first
130 for (size_t peer = 0; peer < services; ++peer) {
131 TEST_ASSERT_EQUAL_INT (
132 2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pull, 0)));
133 const char *str = static_cast<const char *> (zmq_msg_data (&msg));
134 first_half -= str[0];
135 }
136 TEST_ASSERT_EQUAL_INT (0, first_half);
137
138 // And then get the second batch
139 for (size_t peer = 0; peer < services; ++peer) {
140 TEST_ASSERT_EQUAL_INT (
141 2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pull, 0)));
142 const char *str = static_cast<const char *> (zmq_msg_data (&msg));
143 second_half -= str[0];
144 }
145 TEST_ASSERT_EQUAL_INT (0, second_half);
146
147 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
148
149 test_context_socket_close_zero_linger (pull);
150
151 for (size_t peer = 0; peer < services; ++peer)
152 test_context_socket_close_zero_linger (pushs[peer]);
153}
154
155// PUSH: SHALL block on sending, or return a suitable error, when it has no
156// available peers.
157void test_push_block_on_send_no_peers (const char *bind_address_)
158{
159 void *sc = test_context_socket (ZMQ_PUSH);
160
161 int timeout = 250;
162 TEST_ASSERT_SUCCESS_ERRNO (
163 zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)));
164
165 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, ZMQ_DONTWAIT));
166 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, 0));
167
168 test_context_socket_close (sc);
169}
170
171// PUSH and PULL: SHALL create this queue when a peer connects to it. If
172// this peer disconnects, the socket SHALL destroy its queue and SHALL
173// discard any messages it contains.
174void test_destroy_queue_on_disconnect (const char *bind_address_)
175{
176 void *a = test_context_socket (ZMQ_PUSH);
177
178 int hwm = 1;
179 TEST_ASSERT_SUCCESS_ERRNO (
180 zmq_setsockopt (a, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
181
182 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (a, bind_address_));
183 size_t len = MAX_SOCKET_STRING;
184 TEST_ASSERT_SUCCESS_ERRNO (
185 zmq_getsockopt (a, ZMQ_LAST_ENDPOINT, connect_address, &len));
186
187 void *b = test_context_socket (ZMQ_PULL);
188
189 TEST_ASSERT_SUCCESS_ERRNO (
190 zmq_setsockopt (b, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
191
192 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));
193
194 // Send two messages, one should be stuck in A's outgoing queue, the other
195 // arrives at B.
196 s_send_seq (a, "ABC", SEQ_END);
197 s_send_seq (a, "DEF", SEQ_END);
198
199 // Both queues should now be full, indicated by A blocking on send.
200 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT));
201
202 TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (b, connect_address));
203
204 // Disconnect may take time and need command processing.
205 zmq_pollitem_t poller[2] = {{a, 0, 0, 0}, {b, 0, 0, 0}};
206 TEST_ASSERT_EQUAL_INT (
207 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100)));
208 TEST_ASSERT_EQUAL_INT (
209 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100)));
210
211 zmq_msg_t msg;
212 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
213
214 // Can't receive old data on B.
215 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT));
216
217 // Sending fails.
218 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT));
219
220 // Reconnect B
221 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));
222
223 // Still can't receive old data on B.
224 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT));
225
226 // two messages should be sendable before the queues are filled up.
227 s_send_seq (a, "ABC", SEQ_END);
228 s_send_seq (a, "DEF", SEQ_END);
229
230 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT));
231
232 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
233
234 test_context_socket_close_zero_linger (a);
235 test_context_socket_close_zero_linger (b);
236}
237
238// PUSH and PULL: SHALL either receive or drop multipart messages atomically.
239void test_push_multipart_atomic_drop (const char *bind_address_,
240 const bool block_)
241{
242 int linger = 0;
243 int hwm = 1;
244
245 void *push = test_context_socket (ZMQ_PUSH);
246 TEST_ASSERT_SUCCESS_ERRNO (
247 zmq_setsockopt (push, ZMQ_LINGER, &linger, sizeof (linger)));
248 TEST_ASSERT_SUCCESS_ERRNO (
249 zmq_setsockopt (push, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
250 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (push, bind_address_));
251 size_t addr_len = MAX_SOCKET_STRING;
252 TEST_ASSERT_SUCCESS_ERRNO (
253 zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, connect_address, &addr_len));
254
255 void *pull = test_context_socket (ZMQ_PULL);
256 TEST_ASSERT_SUCCESS_ERRNO (
257 zmq_setsockopt (pull, ZMQ_LINGER, &linger, sizeof (linger)));
258 TEST_ASSERT_SUCCESS_ERRNO (
259 zmq_setsockopt (pull, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
260 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pull, connect_address));
261
262 // Wait for connections.
263 msleep (SETTLE_TIME);
264
265 int rc;
266 zmq_msg_t msg_data;
267 // A large message is needed to overrun the TCP buffers
268 const size_t len = 16 * 1024 * 1024;
269 size_t zmq_events_size = sizeof (int);
270 int zmq_events;
271
272 // Normal case - excercise the queues
273 send_string_expect_success (push, "0", ZMQ_SNDMORE);
274 send_string_expect_success (push, "0", ZMQ_SNDMORE);
275 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len));
276 memset (zmq_msg_data (&msg_data), 'a', len);
277 TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0));
278
279 recv_string_expect_success (pull, "0", 0);
280 recv_string_expect_success (pull, "0", 0);
281 zmq_msg_init (&msg_data);
282 TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0));
283 zmq_msg_close (&msg_data);
284
285 // Fill the HWMs of sender and receiver, one message each
286 send_string_expect_success (push, "1", 0);
287
288 send_string_expect_success (push, "2", ZMQ_SNDMORE);
289 send_string_expect_success (push, "2", ZMQ_SNDMORE);
290 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len));
291 memset (zmq_msg_data (&msg_data), 'b', len);
292 TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0));
293
294 // Disconnect and simulate a poll (doesn't work on Windows) to
295 // let the commands run and let the pipes start to be deallocated
296 TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (pull, connect_address));
297
298 zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
299 zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
300 msleep (SETTLE_TIME);
301 zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
302 zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
303
304 // Reconnect and immediately push a large message into the pipe,
305 // if the problem is reproduced the pipe is in the process of being
306 // terminated but still exists (state term_ack_sent) and had already
307 // accepted the frame, so with the first frames already gone and
308 // unreachable only the last is left, and is stuck in the lb.
309 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pull, connect_address));
310
311 send_string_expect_success (push, "3", ZMQ_SNDMORE);
312 send_string_expect_success (push, "3", ZMQ_SNDMORE);
313 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len));
314 memset (zmq_msg_data (&msg_data), 'c', len);
315 if (block_) {
316 TEST_ASSERT_EQUAL_INT (len,
317 zmq_msg_send (&msg_data, push, ZMQ_SNDMORE));
318 } else {
319 rc = zmq_msg_send (&msg_data, push, ZMQ_SNDMORE | ZMQ_DONTWAIT);
320 // inproc won't fail, much faster to connect/disconnect pipes than TCP
321 if (rc == -1) {
322 // at this point the new pipe is there and it works
323 send_string_expect_success (push, "3", ZMQ_SNDMORE);
324 send_string_expect_success (push, "3", ZMQ_SNDMORE);
325 TEST_ASSERT_EQUAL_INT (len,
326 zmq_msg_send (&msg_data, push, ZMQ_SNDMORE));
327 }
328 }
329 send_string_expect_success (push, "3b", 0);
330
331 zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
332 zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
333 msleep (SETTLE_TIME);
334 zmq_getsockopt (push, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
335 zmq_getsockopt (pull, ZMQ_EVENTS, &zmq_events, &zmq_events_size);
336
337 send_string_expect_success (push, "5", ZMQ_SNDMORE);
338 send_string_expect_success (push, "5", ZMQ_SNDMORE);
339 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg_data, len));
340 memset (zmq_msg_data (&msg_data), 'd', len);
341 TEST_ASSERT_EQUAL_INT (len, zmq_msg_send (&msg_data, push, 0));
342
343 // On very slow machines the message will not be lost, as it will
344 // be sent when the new pipe is already in place, so avoid failing
345 // and simply carry on as it would be very noisy otherwise.
346 // Receive both to avoid leaking metadata.
347 // If only the "5" message is received, the problem is reproduced, and
348 // without the fix the first message received would be the last large
349 // frame of "3".
350 char buffer[2];
351 rc =
352 TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (pull, buffer, sizeof (buffer), 0));
353 TEST_ASSERT_EQUAL_INT (1, rc);
354 TEST_ASSERT_TRUE (buffer[0] == '3' || buffer[0] == '5');
355 if (buffer[0] == '3') {
356 recv_string_expect_success (pull, "3", 0);
357 zmq_msg_init (&msg_data);
358 TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0));
359 zmq_msg_close (&msg_data);
360 recv_string_expect_success (pull, "3b", 0);
361 recv_string_expect_success (pull, "5", 0);
362 }
363 recv_string_expect_success (pull, "5", 0);
364 zmq_msg_init (&msg_data);
365 TEST_ASSERT_EQUAL_INT (len, zmq_msg_recv (&msg_data, pull, 0));
366 zmq_msg_close (&msg_data);
367
368 test_context_socket_close_zero_linger (pull);
369 test_context_socket_close_zero_linger (push);
370}
371
372#define def_test_spec_pushpull(name, bind_address_) \
373 void test_spec_pushpull_##name##_push_round_robin_out () \
374 { \
375 test_push_round_robin_out (bind_address_); \
376 } \
377 void test_spec_pushpull_##name##_pull_fair_queue_in () \
378 { \
379 test_pull_fair_queue_in (bind_address_); \
380 } \
381 void test_spec_pushpull_##name##_push_block_on_send_no_peers () \
382 { \
383 test_push_block_on_send_no_peers (bind_address_); \
384 } \
385 void test_spec_pushpull_##name##_destroy_queue_on_disconnect () \
386 { \
387 test_destroy_queue_on_disconnect (bind_address_); \
388 } \
389 void test_spec_pushpull_##name##_push_multipart_atomic_drop_block () \
390 { \
391 test_push_multipart_atomic_drop (bind_address_, true); \
392 } \
393 void test_spec_pushpull_##name##_push_multipart_atomic_drop_non_block () \
394 { \
395 test_push_multipart_atomic_drop (bind_address_, false); \
396 }
397
398def_test_spec_pushpull (inproc, "inproc://a")
399
400 def_test_spec_pushpull (tcp, "tcp://127.0.0.1:*")
401
402 int main ()
403{
404 setup_test_environment ();
405
406 UNITY_BEGIN ();
407 RUN_TEST (test_spec_pushpull_inproc_push_round_robin_out);
408 RUN_TEST (test_spec_pushpull_tcp_push_round_robin_out);
409 RUN_TEST (test_spec_pushpull_inproc_pull_fair_queue_in);
410 RUN_TEST (test_spec_pushpull_tcp_pull_fair_queue_in);
411 RUN_TEST (test_spec_pushpull_inproc_push_block_on_send_no_peers);
412 RUN_TEST (test_spec_pushpull_tcp_push_block_on_send_no_peers);
413 // TODO Tests disabled until libzmq does this properly
414 //RUN_TEST (test_spec_pushpull_inproc_destroy_queue_on_disconnect);
415 //RUN_TEST (test_spec_pushpull_tcp_destroy_queue_on_disconnect);
416 RUN_TEST (test_spec_pushpull_inproc_push_multipart_atomic_drop_block);
417 RUN_TEST (test_spec_pushpull_inproc_push_multipart_atomic_drop_non_block);
418 RUN_TEST (test_spec_pushpull_tcp_push_multipart_atomic_drop_block);
419 RUN_TEST (test_spec_pushpull_tcp_push_multipart_atomic_drop_non_block);
420 return UNITY_END ();
421}
422