1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33SETUP_TEARDOWN_TESTCONTEXT
34
35const int MAX_SENDS = 10000;
36
37void test_change_before_connected ()
38{
39 int rc;
40
41 void *bind_socket = test_context_socket (ZMQ_PUSH);
42 void *connect_socket = test_context_socket (ZMQ_PULL);
43
44 int val = 2;
45 rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof (val));
46 TEST_ASSERT_EQUAL_INT (0, rc);
47 rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &val, sizeof (val));
48 TEST_ASSERT_EQUAL_INT (0, rc);
49
50 zmq_connect (connect_socket, "inproc://a");
51 zmq_bind (bind_socket, "inproc://a");
52
53 size_t placeholder = sizeof (val);
54 val = 0;
55 rc = zmq_getsockopt (bind_socket, ZMQ_SNDHWM, &val, &placeholder);
56 TEST_ASSERT_EQUAL_INT (0, rc);
57 TEST_ASSERT_EQUAL_INT (2, val);
58
59 int send_count = 0;
60 while (send_count < MAX_SENDS
61 && zmq_send (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
62 ++send_count;
63
64 TEST_ASSERT_EQUAL_INT (4, send_count);
65
66 test_context_socket_close (bind_socket);
67 test_context_socket_close (connect_socket);
68}
69
70void test_change_after_connected ()
71{
72 int rc;
73
74 void *bind_socket = test_context_socket (ZMQ_PUSH);
75 void *connect_socket = test_context_socket (ZMQ_PULL);
76
77 int val = 1;
78 rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof (val));
79 TEST_ASSERT_EQUAL_INT (0, rc);
80 rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &val, sizeof (val));
81 TEST_ASSERT_EQUAL_INT (0, rc);
82
83 zmq_connect (connect_socket, "inproc://a");
84 zmq_bind (bind_socket, "inproc://a");
85
86 val = 5;
87 rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &val, sizeof (val));
88 TEST_ASSERT_EQUAL_INT (0, rc);
89
90 size_t placeholder = sizeof (val);
91 val = 0;
92 rc = zmq_getsockopt (bind_socket, ZMQ_SNDHWM, &val, &placeholder);
93 TEST_ASSERT_EQUAL_INT (0, rc);
94 TEST_ASSERT_EQUAL_INT (5, val);
95
96 int send_count = 0;
97 while (send_count < MAX_SENDS
98 && zmq_send (bind_socket, NULL, 0, ZMQ_DONTWAIT) == 0)
99 ++send_count;
100
101 TEST_ASSERT_EQUAL_INT (6, send_count);
102
103 test_context_socket_close (bind_socket);
104 test_context_socket_close (connect_socket);
105}
106
107int send_until_wouldblock (void *socket_)
108{
109 int send_count = 0;
110 while (send_count < MAX_SENDS
111 && zmq_send (socket_, &send_count, sizeof (send_count), ZMQ_DONTWAIT)
112 == sizeof (send_count)) {
113 ++send_count;
114 }
115 return send_count;
116}
117
118int test_fill_up_to_hwm (void *socket_, int sndhwm_)
119{
120 int send_count = send_until_wouldblock (socket_);
121 fprintf (stderr, "sndhwm==%i, send_count==%i\n", sndhwm_, send_count);
122 TEST_ASSERT_LESS_OR_EQUAL_INT (sndhwm_ + 1, send_count);
123 TEST_ASSERT_GREATER_THAN_INT (sndhwm_ / 10, send_count);
124 return send_count;
125}
126
127void test_decrease_when_full ()
128{
129 int rc;
130
131 void *bind_socket = test_context_socket (ZMQ_PUSH);
132 void *connect_socket = test_context_socket (ZMQ_PULL);
133
134 int val = 1;
135 rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof (val));
136 TEST_ASSERT_EQUAL_INT (0, rc);
137
138 int sndhwm = 100;
139 rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm));
140 TEST_ASSERT_EQUAL_INT (0, rc);
141
142 zmq_bind (bind_socket, "inproc://a");
143 zmq_connect (connect_socket, "inproc://a");
144
145 // we must wait for the connect to succeed here, unfortunately we don't
146 // have monitoring events for inproc, so we just hope SETTLE_TIME suffices
147 msleep (SETTLE_TIME);
148
149 // Fill up to hwm
150 int send_count = test_fill_up_to_hwm (bind_socket, sndhwm);
151
152 // Decrease snd hwm
153 sndhwm = 70;
154 rc = zmq_setsockopt (bind_socket, ZMQ_SNDHWM, &sndhwm, sizeof (sndhwm));
155 TEST_ASSERT_EQUAL_INT (0, rc);
156
157 int sndhwm_read = 0;
158 size_t sndhwm_read_size = sizeof (sndhwm_read);
159 rc =
160 zmq_getsockopt (bind_socket, ZMQ_SNDHWM, &sndhwm_read, &sndhwm_read_size);
161 TEST_ASSERT_EQUAL_INT (0, rc);
162 TEST_ASSERT_EQUAL_INT (sndhwm, sndhwm_read);
163
164 msleep (SETTLE_TIME);
165
166 // Read out all data (should get up to previous hwm worth so none were dropped)
167 int read_count = 0;
168 int read_data = 0;
169 while (
170 read_count < MAX_SENDS
171 && zmq_recv (connect_socket, &read_data, sizeof (read_data), ZMQ_DONTWAIT)
172 == sizeof (read_data)) {
173 TEST_ASSERT_EQUAL_INT (read_data, read_count);
174 ++read_count;
175 }
176
177 TEST_ASSERT_EQUAL_INT (send_count, read_count);
178
179 // Give io thread some time to catch up
180 msleep (SETTLE_TIME);
181
182 // Fill up to new hwm
183 test_fill_up_to_hwm (bind_socket, sndhwm);
184
185 test_context_socket_close (bind_socket);
186 test_context_socket_close (connect_socket);
187}
188
189
190int main ()
191{
192 setup_test_environment ();
193
194 UNITY_BEGIN ();
195 RUN_TEST (test_change_before_connected);
196 RUN_TEST (test_change_after_connected);
197 RUN_TEST (test_decrease_when_full);
198
199 return UNITY_END ();
200}
201