1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33SETUP_TEARDOWN_TESTCONTEXT
34
35char connect_address[MAX_SOCKET_STRING];
36
37void test_round_robin_out (const char *bind_address_)
38{
39 void *req = test_context_socket (ZMQ_REQ);
40
41 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (req, bind_address_));
42 size_t len = MAX_SOCKET_STRING;
43 TEST_ASSERT_SUCCESS_ERRNO (
44 zmq_getsockopt (req, ZMQ_LAST_ENDPOINT, connect_address, &len));
45
46 const size_t services = 5;
47 void *rep[services];
48 for (size_t peer = 0; peer < services; peer++) {
49 rep[peer] = test_context_socket (ZMQ_REP);
50
51 int timeout = 250;
52 TEST_ASSERT_SUCCESS_ERRNO (
53 zmq_setsockopt (rep[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
54 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rep[peer], connect_address));
55 }
56 // We have to give the connects time to finish otherwise the requests
57 // will not properly round-robin. We could alternatively connect the
58 // REQ sockets to the REP sockets.
59 msleep (SETTLE_TIME);
60
61 // Send our peer-replies, and expect every REP it used once in order
62 for (size_t peer = 0; peer < services; peer++) {
63 s_send_seq (req, "ABC", SEQ_END);
64 s_recv_seq (rep[peer], "ABC", SEQ_END);
65 s_send_seq (rep[peer], "DEF", SEQ_END);
66 s_recv_seq (req, "DEF", SEQ_END);
67 }
68
69 test_context_socket_close_zero_linger (req);
70 for (size_t peer = 0; peer < services; peer++)
71 test_context_socket_close_zero_linger (rep[peer]);
72}
73
74void test_req_only_listens_to_current_peer (const char *bind_address_)
75{
76 void *req = test_context_socket (ZMQ_REQ);
77
78 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (req, ZMQ_ROUTING_ID, "A", 2));
79
80 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (req, bind_address_));
81 size_t len = MAX_SOCKET_STRING;
82 TEST_ASSERT_SUCCESS_ERRNO (
83 zmq_getsockopt (req, ZMQ_LAST_ENDPOINT, connect_address, &len));
84
85 const size_t services = 3;
86 void *router[services];
87
88 for (size_t i = 0; i < services; ++i) {
89 router[i] = test_context_socket (ZMQ_ROUTER);
90
91 int timeout = 250;
92 TEST_ASSERT_SUCCESS_ERRNO (
93 zmq_setsockopt (router[i], ZMQ_RCVTIMEO, &timeout, sizeof (timeout)));
94
95 int enabled = 1;
96 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
97 router[i], ZMQ_ROUTER_MANDATORY, &enabled, sizeof (enabled)));
98
99 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (router[i], connect_address));
100 }
101
102 // Wait for connects to finish.
103 msleep (SETTLE_TIME);
104
105 for (size_t i = 0; i < services; ++i) {
106 // There still is a race condition when a stale peer's message
107 // arrives at the REQ just after a request was sent to that peer.
108 // To avoid that happening in the test, sleep for a bit.
109 TEST_ASSERT_EQUAL_INT (1,
110 TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (0, 0, 10)));
111
112 s_send_seq (req, "ABC", SEQ_END);
113
114 // Receive on router i
115 s_recv_seq (router[i], "A", 0, "ABC", SEQ_END);
116
117 // Send back replies on all routers
118 for (size_t j = 0; j < services; ++j) {
119 const char *replies[] = {"WRONG", "GOOD"};
120 const char *reply = replies[i == j ? 1 : 0];
121 s_send_seq (router[j], "A", 0, reply, SEQ_END);
122 }
123
124 // Receive only the good reply
125 s_recv_seq (req, "GOOD", SEQ_END);
126 }
127
128 test_context_socket_close_zero_linger (req);
129 for (size_t i = 0; i < services; ++i)
130 test_context_socket_close_zero_linger (router[i]);
131}
132
133void test_req_message_format (const char *bind_address_)
134{
135 void *req = test_context_socket (ZMQ_REQ);
136 void *router = test_context_socket (ZMQ_ROUTER);
137
138 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (req, bind_address_));
139 size_t len = MAX_SOCKET_STRING;
140 TEST_ASSERT_SUCCESS_ERRNO (
141 zmq_getsockopt (req, ZMQ_LAST_ENDPOINT, connect_address, &len));
142
143 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (router, connect_address));
144
145 // Send a multi-part request.
146 s_send_seq (req, "ABC", "DEF", SEQ_END);
147
148 zmq_msg_t msg;
149 zmq_msg_init (&msg);
150
151 // Receive peer routing id
152 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, router, 0));
153 TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&msg));
154 zmq_msg_t peer_id_msg;
155 zmq_msg_init (&peer_id_msg);
156 zmq_msg_copy (&peer_id_msg, &msg);
157
158 int more = 0;
159 size_t more_size = sizeof (more);
160 TEST_ASSERT_SUCCESS_ERRNO (
161 zmq_getsockopt (router, ZMQ_RCVMORE, &more, &more_size));
162 TEST_ASSERT_TRUE (more);
163
164 // Receive the rest.
165 s_recv_seq (router, 0, "ABC", "DEF", SEQ_END);
166
167 // Send back a single-part reply.
168 TEST_ASSERT_SUCCESS_ERRNO (
169 zmq_msg_send (&peer_id_msg, router, ZMQ_SNDMORE));
170 s_send_seq (router, 0, "GHI", SEQ_END);
171
172 // Receive reply.
173 s_recv_seq (req, "GHI", SEQ_END);
174
175 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
176 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_id_msg));
177
178 test_context_socket_close_zero_linger (req);
179 test_context_socket_close_zero_linger (router);
180}
181
182void test_block_on_send_no_peers ()
183{
184 void *sc = test_context_socket (ZMQ_REQ);
185
186 int timeout = 250;
187 TEST_ASSERT_SUCCESS_ERRNO (
188 zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)));
189
190 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, ZMQ_DONTWAIT));
191 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, 0));
192
193 test_context_socket_close (sc);
194}
195
196const char bind_inproc[] = "inproc://a";
197const char bind_tcp[] = "tcp://127.0.0.1:*";
198
199void test_round_robin_out_inproc ()
200{
201 test_round_robin_out (bind_inproc);
202}
203
204void test_round_robin_out_tcp ()
205{
206 test_round_robin_out (bind_tcp);
207}
208
209void test_req_message_format_inproc ()
210{
211 test_req_message_format (bind_inproc);
212}
213
214void test_req_message_format_tcp ()
215{
216 test_req_message_format (bind_tcp);
217}
218
219void test_req_only_listens_to_current_peer_inproc ()
220{
221 test_req_only_listens_to_current_peer (bind_inproc);
222}
223
224void test_req_only_listens_to_current_peer_tcp ()
225{
226 test_req_only_listens_to_current_peer (bind_tcp);
227}
228
229int main ()
230{
231 setup_test_environment ();
232
233 UNITY_BEGIN ();
234
235 // SHALL route outgoing messages to connected peers using a round-robin
236 // strategy.
237 RUN_TEST (test_round_robin_out_inproc);
238 RUN_TEST (test_round_robin_out_tcp);
239
240 // The request and reply messages SHALL have this format on the wire:
241 // * A delimiter, consisting of an empty frame, added by the REQ socket.
242 // * One or more data frames, comprising the message visible to the
243 // application.
244 RUN_TEST (test_req_message_format_inproc);
245 RUN_TEST (test_req_message_format_tcp);
246
247 // SHALL block on sending, or return a suitable error, when it has no
248 // connected peers.
249 RUN_TEST (test_block_on_send_no_peers);
250
251 // SHALL accept an incoming message only from the last peer that it sent a
252 // request to.
253 // SHALL discard silently any messages received from other peers.
254 // TODO PH: this test is still failing; disabled for now to allow build to
255 // complete.
256 // RUN_TEST (test_req_only_listens_to_current_peer_inproc);
257 // RUN_TEST (test_req_only_listens_to_current_peer_tcp);
258
259 return UNITY_END ();
260}
261