1/*
2 Copyright (c) 2007-2019 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29#include "testutil_monitoring.hpp"
30#include "testutil_unity.hpp"
31
32#include <stdlib.h>
33#include <string.h>
34
35// Read one event off the monitor socket; return value and address
36// by reference, if not null, and event number by value. Returns -1
37// in case of error.
38
39static int get_monitor_event_internal (void *monitor_,
40 int *value_,
41 char **address_,
42 int recv_flag_)
43{
44 // First frame in message contains event number and value
45 zmq_msg_t msg;
46 zmq_msg_init (&msg);
47 if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
48 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
49 return -1; // timed out or no message available
50 }
51 TEST_ASSERT_TRUE (zmq_msg_more (&msg));
52
53 uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
54 uint16_t event = *reinterpret_cast<uint16_t *> (data);
55 if (value_)
56 memcpy (value_, data + 2, sizeof (uint32_t));
57
58 // Second frame in message contains event address
59 zmq_msg_init (&msg);
60 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, monitor_, recv_flag_));
61 TEST_ASSERT_FALSE (zmq_msg_more (&msg));
62
63 if (address_) {
64 uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
65 size_t size = zmq_msg_size (&msg);
66 *address_ = static_cast<char *> (malloc (size + 1));
67 memcpy (*address_, data, size);
68 (*address_)[size] = 0;
69 }
70 return event;
71}
72
73int get_monitor_event_with_timeout (void *monitor_,
74 int *value_,
75 char **address_,
76 int timeout_)
77{
78 int res;
79 if (timeout_ == -1) {
80 // process infinite timeout in small steps to allow the user
81 // to see some information on the console
82
83 int timeout_step = 250;
84 int wait_time = 0;
85 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step,
86 sizeof (timeout_step));
87 while (
88 (res = get_monitor_event_internal (monitor_, value_, address_, 0))
89 == -1) {
90 wait_time += timeout_step;
91 fprintf (stderr, "Still waiting for monitor event after %i ms\n",
92 wait_time);
93 }
94 } else {
95 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_));
96 res = get_monitor_event_internal (monitor_, value_, address_, 0);
97 }
98 int timeout_infinite = -1;
99 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite,
100 sizeof (timeout_infinite));
101 return res;
102}
103
104int get_monitor_event (void *monitor_, int *value_, char **address_)
105{
106 return get_monitor_event_with_timeout (monitor_, value_, address_, -1);
107}
108
109void expect_monitor_event (void *monitor_, int expected_event_)
110{
111 TEST_ASSERT_EQUAL_HEX (expected_event_,
112 get_monitor_event (monitor_, NULL, NULL));
113}
114
115static void print_unexpected_event (char *buf_,
116 size_t buf_size_,
117 int event_,
118 int err_,
119 int expected_event_,
120 int expected_err_)
121{
122 snprintf (buf_, buf_size_,
123 "Unexpected event: 0x%x, value = %i/0x%x (expected: 0x%x, value "
124 "= %i/0x%x)\n",
125 event_, err_, err_, expected_event_, expected_err_,
126 expected_err_);
127}
128
129void print_unexpected_event_stderr (int event_,
130 int err_,
131 int expected_event_,
132 int expected_err_)
133{
134 char buf[256];
135 print_unexpected_event (buf, sizeof buf, event_, err_, expected_event_,
136 expected_err_);
137 fputs (buf, stderr);
138}
139
140int expect_monitor_event_multiple (void *server_mon_,
141 int expected_event_,
142 int expected_err_,
143 bool optional_)
144{
145 int count_of_expected_events = 0;
146 int client_closed_connection = 0;
147 int timeout = 250;
148 int wait_time = 0;
149
150 int event;
151 int err;
152 while ((event =
153 get_monitor_event_with_timeout (server_mon_, &err, NULL, timeout))
154 != -1
155 || !count_of_expected_events) {
156 if (event == -1) {
157 if (optional_)
158 break;
159 wait_time += timeout;
160 fprintf (stderr,
161 "Still waiting for first event after %ims (expected event "
162 "%x (value %i/0x%x))\n",
163 wait_time, expected_event_, expected_err_, expected_err_);
164 continue;
165 }
166 // ignore errors with EPIPE/ECONNRESET/ECONNABORTED, which can happen
167 // ECONNRESET can happen on very slow machines, when the engine writes
168 // to the peer and then tries to read the socket before the peer reads
169 // ECONNABORTED happens when a client aborts a connection via RST/timeout
170 if (event == ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL
171 && ((err == EPIPE && expected_err_ != EPIPE) || err == ECONNRESET
172 || err == ECONNABORTED)) {
173 fprintf (stderr,
174 "Ignored event (skipping any further events): %x (err = "
175 "%i == %s)\n",
176 event, err, zmq_strerror (err));
177 client_closed_connection = 1;
178 break;
179 }
180 if (event != expected_event_
181 || (-1 != expected_err_ && err != expected_err_)) {
182 char buf[256];
183 print_unexpected_event (buf, sizeof buf, event, err,
184 expected_event_, expected_err_);
185 TEST_FAIL_MESSAGE (buf);
186 }
187 ++count_of_expected_events;
188 }
189 TEST_ASSERT_TRUE (optional_ || count_of_expected_events > 0
190 || client_closed_connection);
191
192 return count_of_expected_events;
193}
194
195static int64_t get_monitor_event_internal_v2 (void *monitor_,
196 uint64_t *value_,
197 char **local_address_,
198 char **remote_address_,
199 int recv_flag_)
200{
201 // First frame in message contains event number
202 zmq_msg_t msg;
203 zmq_msg_init (&msg);
204 if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
205 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
206 return -1; // timed out or no message available
207 }
208 TEST_ASSERT_TRUE (zmq_msg_more (&msg));
209 TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
210
211 uint64_t event;
212 memcpy (&event, zmq_msg_data (&msg), sizeof (event));
213 zmq_msg_close (&msg);
214
215 // Second frame in message contains the number of values
216 zmq_msg_init (&msg);
217 if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
218 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
219 return -1; // timed out or no message available
220 }
221 TEST_ASSERT_TRUE (zmq_msg_more (&msg));
222 TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
223
224 uint64_t value_count;
225 memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
226 zmq_msg_close (&msg);
227
228 for (uint64_t i = 0; i < value_count; ++i) {
229 // Subsequent frames in message contain event values
230 zmq_msg_init (&msg);
231 if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
232 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, -1);
233 return -1; // timed out or no message available
234 }
235 TEST_ASSERT_TRUE (zmq_msg_more (&msg));
236 TEST_ASSERT_EQUAL_UINT (sizeof (uint64_t), zmq_msg_size (&msg));
237
238 if (value_ && value_ + i)
239 memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_));
240 zmq_msg_close (&msg);
241 }
242
243 // Second-to-last frame in message contains local address
244 zmq_msg_init (&msg);
245 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, monitor_, recv_flag_));
246 TEST_ASSERT_TRUE (zmq_msg_more (&msg));
247
248 if (local_address_) {
249 uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
250 size_t size = zmq_msg_size (&msg);
251 *local_address_ = static_cast<char *> (malloc (size + 1));
252 memcpy (*local_address_, data, size);
253 (*local_address_)[size] = 0;
254 }
255 zmq_msg_close (&msg);
256
257 // Last frame in message contains remote address
258 zmq_msg_init (&msg);
259 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, monitor_, recv_flag_));
260 TEST_ASSERT_TRUE (!zmq_msg_more (&msg));
261
262 if (remote_address_) {
263 uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
264 size_t size = zmq_msg_size (&msg);
265 *remote_address_ = static_cast<char *> (malloc (size + 1));
266 memcpy (*remote_address_, data, size);
267 (*remote_address_)[size] = 0;
268 }
269 zmq_msg_close (&msg);
270 return event;
271}
272
273static int64_t get_monitor_event_with_timeout_v2 (void *monitor_,
274 uint64_t *value_,
275 char **local_address_,
276 char **remote_address_,
277 int timeout_)
278{
279 int64_t res;
280 if (timeout_ == -1) {
281 // process infinite timeout in small steps to allow the user
282 // to see some information on the console
283
284 int timeout_step = 250;
285 int wait_time = 0;
286 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_step,
287 sizeof (timeout_step));
288 while ((res = get_monitor_event_internal_v2 (
289 monitor_, value_, local_address_, remote_address_, 0))
290 == -1) {
291 wait_time += timeout_step;
292 fprintf (stderr, "Still waiting for monitor event after %i ms\n",
293 wait_time);
294 }
295 } else {
296 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_, sizeof (timeout_));
297 res = get_monitor_event_internal_v2 (monitor_, value_, local_address_,
298 remote_address_, 0);
299 }
300 int timeout_infinite = -1;
301 zmq_setsockopt (monitor_, ZMQ_RCVTIMEO, &timeout_infinite,
302 sizeof (timeout_infinite));
303 return res;
304}
305
306int64_t get_monitor_event_v2 (void *monitor_,
307 uint64_t *value_,
308 char **local_address_,
309 char **remote_address_)
310{
311 return get_monitor_event_with_timeout_v2 (monitor_, value_, local_address_,
312 remote_address_, -1);
313}
314
315void expect_monitor_event_v2 (void *monitor_,
316 int64_t expected_event_,
317 const char *expected_local_address_,
318 const char *expected_remote_address_)
319{
320 char *local_address = NULL;
321 char *remote_address = NULL;
322 int64_t event = get_monitor_event_v2 (
323 monitor_, NULL, expected_local_address_ ? &local_address : NULL,
324 expected_remote_address_ ? &remote_address : NULL);
325 bool failed = false;
326 char buf[256];
327 char *pos = buf;
328 if (event != expected_event_) {
329 pos += snprintf (pos, sizeof buf - (pos - buf),
330 "Expected monitor event %llx, but received %llx\n",
331 static_cast<long long> (expected_event_),
332 static_cast<long long> (event));
333 failed = true;
334 }
335 if (expected_local_address_
336 && 0 != strcmp (local_address, expected_local_address_)) {
337 pos += snprintf (pos, sizeof buf - (pos - buf),
338 "Expected local address %s, but received %s\n",
339 expected_local_address_, local_address);
340 }
341 if (expected_remote_address_
342 && 0 != strcmp (remote_address, expected_remote_address_)) {
343 snprintf (pos, sizeof buf - (pos - buf),
344 "Expected remote address %s, but received %s\n",
345 expected_remote_address_, remote_address);
346 }
347 free (local_address);
348 free (remote_address);
349 TEST_ASSERT_FALSE_MESSAGE (failed, buf);
350}
351