1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33#include <string.h>
34
35// NOTE: on OSX the endpoint returned by ZMQ_LAST_ENDPOINT may be quite long,
36// ensure we have extra space for that:
37#define SOCKET_STRING_LEN (MAX_SOCKET_STRING * 4)
38
39SETUP_TEARDOWN_TESTCONTEXT
40
41int test_defaults (int send_hwm_, int msg_cnt_, const char *endpoint_)
42{
43 char pub_endpoint[SOCKET_STRING_LEN];
44
45 // Set up and bind XPUB socket
46 void *pub_socket = test_context_socket (ZMQ_XPUB);
47 test_bind (pub_socket, endpoint_, pub_endpoint, sizeof pub_endpoint);
48
49 // Set up and connect SUB socket
50 void *sub_socket = test_context_socket (ZMQ_SUB);
51 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, pub_endpoint));
52
53 //set a hwm on publisher
54 TEST_ASSERT_SUCCESS_ERRNO (
55 zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm_, sizeof (send_hwm_)));
56 TEST_ASSERT_SUCCESS_ERRNO (
57 zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
58
59 // Wait before starting TX operations till 1 subscriber has subscribed
60 // (in this test there's 1 subscriber only)
61 const char subscription_to_all_topics[] = {1, 0};
62 recv_string_expect_success (pub_socket, subscription_to_all_topics, 0);
63
64 // Send until we reach "mute" state
65 int send_count = 0;
66 while (send_count < msg_cnt_
67 && zmq_send (pub_socket, "test message", 13, ZMQ_DONTWAIT) == 13)
68 ++send_count;
69
70 TEST_ASSERT_EQUAL_INT (send_hwm_, send_count);
71 msleep (SETTLE_TIME);
72
73 // Now receive all sent messages
74 int recv_count = 0;
75 char dummybuff[64];
76 while (13 == zmq_recv (sub_socket, &dummybuff, 64, ZMQ_DONTWAIT)) {
77 ++recv_count;
78 }
79
80 TEST_ASSERT_EQUAL_INT (send_hwm_, recv_count);
81
82 // Clean up
83 test_context_socket_close (sub_socket);
84 test_context_socket_close (pub_socket);
85
86 return recv_count;
87}
88
89int receive (void *socket_, int *is_termination_)
90{
91 int recv_count = 0;
92 *is_termination_ = 0;
93
94 // Now receive all sent messages
95 char buffer[255];
96 int len;
97 while ((len = zmq_recv (socket_, buffer, sizeof (buffer), 0)) >= 0) {
98 ++recv_count;
99
100 if (len == 3 && strncmp (buffer, "end", len) == 0) {
101 *is_termination_ = 1;
102 return recv_count;
103 }
104 }
105
106 return recv_count;
107}
108
109int test_blocking (int send_hwm_, int msg_cnt_, const char *endpoint_)
110{
111 char pub_endpoint[SOCKET_STRING_LEN];
112
113 // Set up bind socket
114 void *pub_socket = test_context_socket (ZMQ_XPUB);
115 test_bind (pub_socket, endpoint_, pub_endpoint, sizeof pub_endpoint);
116
117 // Set up connect socket
118 void *sub_socket = test_context_socket (ZMQ_SUB);
119 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, pub_endpoint));
120
121 //set a hwm on publisher
122 TEST_ASSERT_SUCCESS_ERRNO (
123 zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &send_hwm_, sizeof (send_hwm_)));
124 int wait = 1;
125 TEST_ASSERT_SUCCESS_ERRNO (
126 zmq_setsockopt (pub_socket, ZMQ_XPUB_NODROP, &wait, sizeof (wait)));
127 int timeout_ms = 10;
128 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
129 sub_socket, ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
130 TEST_ASSERT_SUCCESS_ERRNO (
131 zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
132
133 // Wait before starting TX operations till 1 subscriber has subscribed
134 // (in this test there's 1 subscriber only)
135 const uint8_t subscription_to_all_topics[] = {1};
136 recv_array_expect_success (pub_socket, subscription_to_all_topics, 0);
137
138 // Send until we block
139 int send_count = 0;
140 int recv_count = 0;
141 int blocked_count = 0;
142 int is_termination = 0;
143 while (send_count < msg_cnt_) {
144 const int rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT);
145 if (rc == 0) {
146 ++send_count;
147 } else if (-1 == rc) {
148 // if the PUB socket blocks due to HWM, errno should be EAGAIN:
149 blocked_count++;
150 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
151 recv_count += receive (sub_socket, &is_termination);
152 }
153 }
154
155 // if send_hwm_ < msg_cnt_, we should block at least once:
156 char counts_string[128];
157 snprintf (counts_string, sizeof counts_string - 1,
158 "sent = %i, received = %i", send_count, recv_count);
159 TEST_ASSERT_GREATER_THAN_INT_MESSAGE (0, blocked_count, counts_string);
160
161 // dequeue SUB socket again, to make sure XPUB has space to send the termination message
162 recv_count += receive (sub_socket, &is_termination);
163
164 // send termination message
165 send_string_expect_success (pub_socket, "end", 0);
166
167 // now block on the SUB side till we get the termination message
168 while (is_termination == 0)
169 recv_count += receive (sub_socket, &is_termination);
170
171 // remove termination message from the count:
172 recv_count--;
173
174 TEST_ASSERT_EQUAL_INT (send_count, recv_count);
175
176 // Clean up
177 test_context_socket_close (sub_socket);
178 test_context_socket_close (pub_socket);
179
180 return recv_count;
181}
182
183// hwm should apply to the messages that have already been received
184// with hwm 11024: send 9999 msg, receive 9999, send 1100, receive 1100
185void test_reset_hwm ()
186{
187 const int first_count = 9999;
188 const int second_count = 1100;
189 int hwm = 11024;
190 char my_endpoint[SOCKET_STRING_LEN];
191
192 // Set up bind socket
193 void *pub_socket = test_context_socket (ZMQ_PUB);
194 TEST_ASSERT_SUCCESS_ERRNO (
195 zmq_setsockopt (pub_socket, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
196 bind_loopback_ipv4 (pub_socket, my_endpoint, MAX_SOCKET_STRING);
197
198 // Set up connect socket
199 void *sub_socket = test_context_socket (ZMQ_SUB);
200 TEST_ASSERT_SUCCESS_ERRNO (
201 zmq_setsockopt (sub_socket, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
202 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_socket, my_endpoint));
203 TEST_ASSERT_SUCCESS_ERRNO (
204 zmq_setsockopt (sub_socket, ZMQ_SUBSCRIBE, 0, 0));
205
206 msleep (SETTLE_TIME);
207
208 // Send messages
209 int send_count = 0;
210 while (send_count < first_count
211 && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
212 ++send_count;
213 TEST_ASSERT_EQUAL_INT (first_count, send_count);
214
215 msleep (SETTLE_TIME);
216
217 // Now receive all sent messages
218 int recv_count = 0;
219 while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) {
220 ++recv_count;
221 }
222 TEST_ASSERT_EQUAL_INT (first_count, recv_count);
223
224 msleep (SETTLE_TIME);
225
226 // Send messages
227 send_count = 0;
228 while (send_count < second_count
229 && zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
230 ++send_count;
231 TEST_ASSERT_EQUAL_INT (second_count, send_count);
232
233 msleep (SETTLE_TIME);
234
235 // Now receive all sent messages
236 recv_count = 0;
237 while (0 == zmq_recv (sub_socket, NULL, 0, ZMQ_DONTWAIT)) {
238 ++recv_count;
239 }
240 TEST_ASSERT_EQUAL_INT (second_count, recv_count);
241
242 // Clean up
243 test_context_socket_close (sub_socket);
244 test_context_socket_close (pub_socket);
245}
246
247void test_defaults_large (const char *bind_endpoint_)
248{
249 // send 1000 msg on hwm 1000, receive 1000
250 TEST_ASSERT_EQUAL_INT (1000, test_defaults (1000, 1000, bind_endpoint_));
251}
252
253void test_defaults_small (const char *bind_endpoint_)
254{
255 // send 1000 msg on hwm 100, receive 100
256 TEST_ASSERT_EQUAL_INT (100, test_defaults (100, 100, bind_endpoint_));
257}
258
259void test_blocking (const char *bind_endpoint_)
260{
261 // send 6000 msg on hwm 2000, drops above hwm, only receive hwm:
262 TEST_ASSERT_EQUAL_INT (6000, test_blocking (2000, 6000, bind_endpoint_));
263}
264
265#define DEFINE_REGULAR_TEST_CASES(name, bind_endpoint) \
266 void test_defaults_large_##name () \
267 { \
268 test_defaults_large (bind_endpoint); \
269 } \
270 \
271 void test_defaults_small_##name () \
272 { \
273 test_defaults_small (bind_endpoint); \
274 } \
275 \
276 void test_blocking_##name () { test_blocking (bind_endpoint); }
277
278#define RUN_REGULAR_TEST_CASES(name) \
279 RUN_TEST (test_defaults_large_##name); \
280 RUN_TEST (test_defaults_small_##name); \
281 RUN_TEST (test_blocking_##name)
282
283DEFINE_REGULAR_TEST_CASES (tcp, "tcp://127.0.0.1:*")
284DEFINE_REGULAR_TEST_CASES (inproc, "inproc://a")
285
286#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_GNU)
287DEFINE_REGULAR_TEST_CASES (ipc, "ipc://*")
288#endif
289
290int main ()
291{
292 setup_test_environment ();
293
294 UNITY_BEGIN ();
295
296 RUN_REGULAR_TEST_CASES (tcp);
297 RUN_REGULAR_TEST_CASES (inproc);
298
299#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_GNU)
300 RUN_REGULAR_TEST_CASES (ipc);
301#endif
302 RUN_TEST (test_reset_hwm);
303 return UNITY_END ();
304}
305