1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33SETUP_TEARDOWN_TESTCONTEXT
34
35// SHALL route outgoing messages to available peers using a round-robin
36// strategy.
37void test_round_robin_out (const char *bind_address_)
38{
39 void *dealer = test_context_socket (ZMQ_DEALER);
40
41 char connect_address[MAX_SOCKET_STRING];
42 test_bind (dealer, bind_address_, connect_address,
43 sizeof (connect_address));
44
45 const size_t services = 5;
46 void *rep[services];
47 for (size_t peer = 0; peer < services; ++peer) {
48 rep[peer] = test_context_socket (ZMQ_REP);
49
50 int timeout = 250;
51 TEST_ASSERT_SUCCESS_ERRNO (
52 zmq_setsockopt (rep[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
53
54 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rep[peer], connect_address));
55 }
56
57 // Wait for connections.
58 msleep (SETTLE_TIME);
59
60 // Send all requests
61 for (size_t i = 0; i < services; ++i)
62 s_send_seq (dealer, 0, "ABC", SEQ_END);
63
64 // Expect every REP got one message
65 zmq_msg_t msg;
66 zmq_msg_init (&msg);
67
68 for (size_t peer = 0; peer < services; ++peer)
69 s_recv_seq (rep[peer], "ABC", SEQ_END);
70
71 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
72
73 test_context_socket_close_zero_linger (dealer);
74
75 for (size_t peer = 0; peer < services; ++peer)
76 test_context_socket_close_zero_linger (rep[peer]);
77}
78
79// SHALL receive incoming messages from its peers using a fair-queuing
80// strategy.
81void test_fair_queue_in (const char *bind_address_)
82{
83 void *receiver = test_context_socket (ZMQ_DEALER);
84
85 int timeout = 250;
86 TEST_ASSERT_SUCCESS_ERRNO (
87 zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
88
89 char connect_address[MAX_SOCKET_STRING];
90 test_bind (receiver, bind_address_, connect_address,
91 sizeof (connect_address));
92
93 const size_t services = 5;
94 void *senders[services];
95 for (size_t peer = 0; peer < services; ++peer) {
96 senders[peer] = test_context_socket (ZMQ_DEALER);
97
98 TEST_ASSERT_SUCCESS_ERRNO (
99 zmq_setsockopt (senders[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
100
101 TEST_ASSERT_SUCCESS_ERRNO (
102 zmq_connect (senders[peer], connect_address));
103 }
104
105 zmq_msg_t msg;
106 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
107
108 s_send_seq (senders[0], "A", SEQ_END);
109 s_recv_seq (receiver, "A", SEQ_END);
110
111 s_send_seq (senders[0], "A", SEQ_END);
112 s_recv_seq (receiver, "A", SEQ_END);
113
114 // send our requests
115 for (size_t peer = 0; peer < services; ++peer)
116 s_send_seq (senders[peer], "B", SEQ_END);
117
118 // Wait for data.
119 msleep (SETTLE_TIME);
120
121 // handle the requests
122 for (size_t peer = 0; peer < services; ++peer)
123 s_recv_seq (receiver, "B", SEQ_END);
124
125 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
126
127 test_context_socket_close_zero_linger (receiver);
128
129 for (size_t peer = 0; peer < services; ++peer)
130 test_context_socket_close_zero_linger (senders[peer]);
131}
132
133// SHALL create a double queue when a peer connects to it. If this peer
134// disconnects, the DEALER socket SHALL destroy its double queue and SHALL
135// discard any messages it contains.
136void test_destroy_queue_on_disconnect (const char *bind_address_)
137{
138 void *a = test_context_socket (ZMQ_DEALER);
139
140 char connect_address[MAX_SOCKET_STRING];
141 test_bind (a, bind_address_, connect_address, sizeof (connect_address));
142
143 void *b = test_context_socket (ZMQ_DEALER);
144
145 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));
146
147 // Send a message in both directions
148 s_send_seq (a, "ABC", SEQ_END);
149 s_send_seq (b, "DEF", SEQ_END);
150
151 TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (b, connect_address));
152
153 // Disconnect may take time and need command processing.
154 zmq_pollitem_t poller[2] = {{a, 0, 0, 0}, {b, 0, 0, 0}};
155 TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100));
156 TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100));
157
158 // No messages should be available, sending should fail.
159 zmq_msg_t msg;
160 zmq_msg_init (&msg);
161
162 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT));
163
164 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, a, ZMQ_DONTWAIT));
165
166 // After a reconnect of B, the messages should still be gone
167 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address));
168
169 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, a, ZMQ_DONTWAIT));
170
171 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT));
172
173 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
174
175 test_context_socket_close_zero_linger (a);
176 test_context_socket_close_zero_linger (b);
177}
178
179// SHALL block on sending, or return a suitable error, when it has no connected peers.
180void test_block_on_send_no_peers (const char *bind_address_)
181{
182 void *sc = test_context_socket (ZMQ_DEALER);
183
184 int timeout = 250;
185 TEST_ASSERT_SUCCESS_ERRNO (
186 zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)));
187
188 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, ZMQ_DONTWAIT));
189 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, 0));
190
191 test_context_socket_close (sc);
192}
193
194#define TEST_CASES(name, bind_address) \
195 void test_round_robin_out_##name () \
196 { \
197 test_round_robin_out (bind_address); \
198 } \
199 void test_fair_queue_in_##name () { test_fair_queue_in (bind_address); } \
200 void test_block_on_send_no_peers_##name () \
201 { \
202 test_block_on_send_no_peers (bind_address); \
203 }
204
205TEST_CASES (inproc, "inproc://a")
206TEST_CASES (tcp, "tcp://127.0.0.1:*")
207
208int main (void)
209{
210 setup_test_environment ();
211
212 UNITY_BEGIN ();
213
214 RUN_TEST (test_round_robin_out_inproc);
215 RUN_TEST (test_fair_queue_in_inproc);
216 RUN_TEST (test_block_on_send_no_peers_inproc);
217
218 RUN_TEST (test_round_robin_out_tcp);
219 RUN_TEST (test_fair_queue_in_tcp);
220 RUN_TEST (test_block_on_send_no_peers_tcp);
221
222 // TODO *** Test disabled until libzmq does this properly ***
223 // test_destroy_queue_on_disconnect (ctx);
224
225 return UNITY_END ();
226}
227