1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31
32#include "testutil.hpp"
33#include "testutil_unity.hpp"
34
35#include <unity.h>
36
37const size_t services = 5;
38
39void *req;
40void *rep[services];
41
42void setUp ()
43{
44 setup_test_context ();
45
46 char my_endpoint[MAX_SOCKET_STRING];
47 req = test_context_socket (ZMQ_REQ);
48
49 int enabled = 1;
50 TEST_ASSERT_SUCCESS_ERRNO (
51 zmq_setsockopt (req, ZMQ_REQ_RELAXED, &enabled, sizeof (int)));
52
53 TEST_ASSERT_SUCCESS_ERRNO (
54 zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int)));
55
56 bind_loopback_ipv4 (req, my_endpoint, sizeof (my_endpoint));
57
58 for (size_t peer = 0; peer < services; peer++) {
59 rep[peer] = test_context_socket (ZMQ_REP);
60
61 int timeout = 500;
62 TEST_ASSERT_SUCCESS_ERRNO (
63 zmq_setsockopt (rep[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)));
64
65 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rep[peer], my_endpoint));
66 }
67 // We have to give the connects time to finish otherwise the requests
68 // will not properly round-robin. We could alternatively connect the
69 // REQ sockets to the REP sockets.
70 msleep (SETTLE_TIME);
71}
72
73void tearDown ()
74{
75 test_context_socket_close_zero_linger (req);
76 for (size_t peer = 0; peer < services; peer++)
77 test_context_socket_close_zero_linger (rep[peer]);
78
79 teardown_test_context ();
80}
81
82static void bounce (void *socket_)
83{
84 int more;
85 size_t more_size = sizeof (more);
86 do {
87 zmq_msg_t recv_part, sent_part;
88 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&recv_part));
89
90 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&recv_part, socket_, 0));
91
92 TEST_ASSERT_SUCCESS_ERRNO (
93 zmq_getsockopt (socket_, ZMQ_RCVMORE, &more, &more_size));
94
95 zmq_msg_init (&sent_part);
96 zmq_msg_copy (&sent_part, &recv_part);
97
98 TEST_ASSERT_SUCCESS_ERRNO (
99 zmq_msg_send (&sent_part, socket_, more ? ZMQ_SNDMORE : 0));
100
101 zmq_msg_close (&recv_part);
102 } while (more);
103}
104
105static int get_events (void *socket_)
106{
107 int events;
108 size_t events_size = sizeof (events);
109 TEST_ASSERT_SUCCESS_ERRNO (
110 zmq_getsockopt (socket_, ZMQ_EVENTS, &events, &events_size));
111 return events;
112}
113
114void test_case_1 ()
115{
116 // Case 1: Second send() before a reply arrives in a pipe.
117
118 int events = get_events (req);
119 TEST_ASSERT_EQUAL_INT (ZMQ_POLLOUT, events);
120
121 // Send a request, ensure it arrives, don't send a reply
122 s_send_seq (req, "A", "B", SEQ_END);
123 s_recv_seq (rep[0], "A", "B", SEQ_END);
124
125 events = get_events (req);
126 TEST_ASSERT_EQUAL_INT (ZMQ_POLLOUT, events);
127
128 // Send another request on the REQ socket
129 s_send_seq (req, "C", "D", SEQ_END);
130 s_recv_seq (rep[1], "C", "D", SEQ_END);
131
132 events = get_events (req);
133 TEST_ASSERT_EQUAL_INT (ZMQ_POLLOUT, events);
134
135 // Send a reply to the first request - that should be discarded by the REQ
136 s_send_seq (rep[0], "WRONG", SEQ_END);
137
138 // Send the expected reply
139 s_send_seq (rep[1], "OK", SEQ_END);
140 s_recv_seq (req, "OK", SEQ_END);
141
142 // Another standard req-rep cycle, just to check
143 s_send_seq (req, "E", SEQ_END);
144 s_recv_seq (rep[2], "E", SEQ_END);
145 s_send_seq (rep[2], "F", "G", SEQ_END);
146 s_recv_seq (req, "F", "G", SEQ_END);
147}
148
149void test_case_2 ()
150{
151 // Case 2: Second send() after a reply is already in a pipe on the REQ.
152
153 // TODO instead of rerunning the previous test cases, only do the relevant parts (or change the peer)
154 test_case_1 ();
155
156 // Send a request, ensure it arrives, send a reply
157 s_send_seq (req, "H", SEQ_END);
158 s_recv_seq (rep[3], "H", SEQ_END);
159 s_send_seq (rep[3], "BAD", SEQ_END);
160
161 // Wait for message to be there.
162 msleep (SETTLE_TIME);
163
164 // Without receiving that reply, send another request on the REQ socket
165 s_send_seq (req, "I", SEQ_END);
166 s_recv_seq (rep[4], "I", SEQ_END);
167
168 // Send the expected reply
169 s_send_seq (rep[4], "GOOD", SEQ_END);
170 s_recv_seq (req, "GOOD", SEQ_END);
171}
172
173void test_case_3 ()
174{
175 // Case 3: Check issue #1690. Two send() in a row should not close the
176 // communication pipes. For example pipe from req to rep[0] should not be
177 // closed after executing Case 1. So rep[0] should be the next to receive,
178 // not rep[1].
179
180 // TODO instead of rerunning the previous test cases, only do the relevant parts (or change the peer)
181 test_case_2 ();
182
183 s_send_seq (req, "J", SEQ_END);
184 s_recv_seq (rep[0], "J", SEQ_END);
185}
186
187void test_case_4 ()
188{
189 // TODO this test case does not use the sockets from setUp
190
191 // Case 4: Check issue #1695. As messages may pile up before a responder
192 // is available, we check that responses to messages other than the last
193 // sent one are correctly discarded by the REQ pipe
194
195 // Setup REQ socket as client
196 void *req = test_context_socket (ZMQ_REQ);
197
198 int enabled = 1;
199 TEST_ASSERT_SUCCESS_ERRNO (
200 zmq_setsockopt (req, ZMQ_REQ_RELAXED, &enabled, sizeof (int)));
201 TEST_ASSERT_SUCCESS_ERRNO (
202 zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int)));
203
204 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (req, ENDPOINT_0));
205
206 // Setup ROUTER socket as server but do not bind it just yet
207 void *router = test_context_socket (ZMQ_ROUTER);
208
209 // Send two requests
210 s_send_seq (req, "TO_BE_DISCARDED", SEQ_END);
211 s_send_seq (req, "TO_BE_ANSWERED", SEQ_END);
212
213 // Bind server allowing it to receive messages
214 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (router, ENDPOINT_0));
215
216 // Read the two messages and send them back as is
217 bounce (router);
218 bounce (router);
219
220 // Read the expected correlated reply. As the ZMQ_REQ_CORRELATE is active,
221 // the expected answer is "TO_BE_ANSWERED", not "TO_BE_DISCARDED".
222 s_recv_seq (req, "TO_BE_ANSWERED", SEQ_END);
223
224 test_context_socket_close_zero_linger (req);
225 test_context_socket_close_zero_linger (router);
226}
227
228int main ()
229{
230 setup_test_environment ();
231
232 UNITY_BEGIN ();
233 RUN_TEST (test_case_1);
234 RUN_TEST (test_case_2);
235 RUN_TEST (test_case_3);
236 RUN_TEST (test_case_4);
237 return UNITY_END ();
238}
239