1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_monitoring.hpp"
32
33#include "testutil_unity.hpp"
34
35#include <stdlib.h>
36#include <string.h>
37
38SETUP_TEARDOWN_TESTCONTEXT
39
40void test_monitor_invalid_protocol_fails ()
41{
42 void *client = test_context_socket (ZMQ_DEALER);
43
44 // Socket monitoring only works over inproc://
45 TEST_ASSERT_FAILURE_ERRNO (
46 EPROTONOSUPPORT, zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0));
47
48#ifdef ZMQ_EVENT_PIPES_STATS
49 // Stats command needs to be called on a valid socket with monitoring
50 // enabled
51 TEST_ASSERT_FAILURE_ERRNO (ENOTSOCK, zmq_socket_monitor_pipes_stats (NULL));
52 TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_socket_monitor_pipes_stats (client));
53#endif
54
55 test_context_socket_close_zero_linger (client);
56}
57
58void test_monitor_basic ()
59{
60 char my_endpoint[MAX_SOCKET_STRING];
61
62 // We'll monitor these two sockets
63 void *client = test_context_socket (ZMQ_DEALER);
64 void *server = test_context_socket (ZMQ_DEALER);
65
66 // Monitor all events on client and server sockets
67 TEST_ASSERT_SUCCESS_ERRNO (
68 zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL));
69 TEST_ASSERT_SUCCESS_ERRNO (
70 zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL));
71
72 // Create two sockets for collecting monitor events
73 void *client_mon = test_context_socket (ZMQ_PAIR);
74 void *server_mon = test_context_socket (ZMQ_PAIR);
75
76 // Connect these to the inproc endpoints so they'll get events
77 TEST_ASSERT_SUCCESS_ERRNO (
78 zmq_connect (client_mon, "inproc://monitor-client"));
79 TEST_ASSERT_SUCCESS_ERRNO (
80 zmq_connect (server_mon, "inproc://monitor-server"));
81
82 // Now do a basic ping test
83 bind_loopback_ipv4 (server, my_endpoint, sizeof my_endpoint);
84
85 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
86 bounce (server, client);
87
88 // Close client and server
89 // TODO why does this use zero_linger?
90 test_context_socket_close_zero_linger (client);
91 test_context_socket_close_zero_linger (server);
92
93 // Now collect and check events from both sockets
94 int event = get_monitor_event (client_mon, NULL, NULL);
95 if (event == ZMQ_EVENT_CONNECT_DELAYED)
96 event = get_monitor_event (client_mon, NULL, NULL);
97 TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_CONNECTED, event);
98 expect_monitor_event (client_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
99 event = get_monitor_event (client_mon, NULL, NULL);
100 if (event == ZMQ_EVENT_DISCONNECTED) {
101 expect_monitor_event (client_mon, ZMQ_EVENT_CONNECT_RETRIED);
102 expect_monitor_event (client_mon, ZMQ_EVENT_MONITOR_STOPPED);
103 } else
104 TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
105
106 // This is the flow of server events
107 expect_monitor_event (server_mon, ZMQ_EVENT_LISTENING);
108 expect_monitor_event (server_mon, ZMQ_EVENT_ACCEPTED);
109 expect_monitor_event (server_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
110 event = get_monitor_event (server_mon, NULL, NULL);
111 // Sometimes the server sees the client closing before it gets closed.
112 if (event != ZMQ_EVENT_DISCONNECTED) {
113 TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_CLOSED, event);
114 event = get_monitor_event (server_mon, NULL, NULL);
115 }
116 if (event != ZMQ_EVENT_DISCONNECTED) {
117 TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
118 }
119
120 // Close down the sockets
121 // TODO why does this use zero_linger?
122 test_context_socket_close_zero_linger (client_mon);
123 test_context_socket_close_zero_linger (server_mon);
124}
125
126#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
127 || (defined ZMQ_CURRENT_EVENT_VERSION \
128 && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
129void test_monitor_versioned_invalid_socket_type ()
130{
131 void *client = test_context_socket (ZMQ_DEALER);
132
133 // Socket monitoring only works with ZMQ_PAIR, ZMQ_PUB and ZMQ_PUSH.
134 TEST_ASSERT_FAILURE_ERRNO (
135 EINVAL, zmq_socket_monitor_versioned (
136 client, "inproc://invalid-socket-type", 0, 2, ZMQ_CLIENT));
137
138 test_context_socket_close_zero_linger (client);
139}
140
141void test_monitor_versioned_basic (bind_function_t bind_function_,
142 const char *expected_prefix_,
143 int type_)
144{
145 char server_endpoint[MAX_SOCKET_STRING];
146 char client_mon_endpoint[MAX_SOCKET_STRING];
147 char server_mon_endpoint[MAX_SOCKET_STRING];
148
149 // Create a unique endpoint for each call so we don't have
150 // to wait for the sockets to unbind.
151 snprintf (client_mon_endpoint, MAX_SOCKET_STRING, "inproc://client%s%d",
152 expected_prefix_, type_);
153 snprintf (server_mon_endpoint, MAX_SOCKET_STRING, "inproc://server%s%d",
154 expected_prefix_, type_);
155
156 // We'll monitor these two sockets
157 void *client = test_context_socket (ZMQ_DEALER);
158 void *server = test_context_socket (ZMQ_DEALER);
159
160 // Monitor all events on client and server sockets
161 TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
162 client, client_mon_endpoint, ZMQ_EVENT_ALL_V2, 2, type_));
163 TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
164 server, server_mon_endpoint, ZMQ_EVENT_ALL_V2, 2, type_));
165
166 // Choose the appropriate consumer socket type.
167 int mon_type = ZMQ_PAIR;
168 switch (type_) {
169 case ZMQ_PAIR:
170 mon_type = ZMQ_PAIR;
171 break;
172 case ZMQ_PUSH:
173 mon_type = ZMQ_PULL;
174 break;
175 case ZMQ_PUB:
176 mon_type = ZMQ_SUB;
177 break;
178 }
179
180 // Create two sockets for collecting monitor events
181 void *client_mon = test_context_socket (mon_type);
182 void *server_mon = test_context_socket (mon_type);
183
184 // Additionally subscribe to all events if a PUB socket is used.
185 if (type_ == ZMQ_PUB) {
186 TEST_ASSERT_SUCCESS_ERRNO (
187 zmq_setsockopt (client_mon, ZMQ_SUBSCRIBE, "", 0));
188 TEST_ASSERT_SUCCESS_ERRNO (
189 zmq_setsockopt (server_mon, ZMQ_SUBSCRIBE, "", 0));
190 }
191
192 // Connect these to the inproc endpoints so they'll get events
193 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client_mon, client_mon_endpoint));
194 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (server_mon, server_mon_endpoint));
195
196 // Now do a basic ping test
197 bind_function_ (server, server_endpoint, sizeof server_endpoint);
198
199 int ipv6;
200 size_t ipv6_size = sizeof (ipv6);
201 TEST_ASSERT_SUCCESS_ERRNO (
202 zmq_getsockopt (server, ZMQ_IPV6, &ipv6, &ipv6_size));
203 TEST_ASSERT_SUCCESS_ERRNO (
204 zmq_setsockopt (client, ZMQ_IPV6, &ipv6, sizeof (int)));
205 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, server_endpoint));
206 bounce (server, client);
207
208 // Close client and server
209 // TODO why does this use zero_linger?
210 test_context_socket_close_zero_linger (client);
211 test_context_socket_close_zero_linger (server);
212
213 char *client_local_address = NULL;
214 char *client_remote_address = NULL;
215
216 // Now collect and check events from both sockets
217 int64_t event = get_monitor_event_v2 (
218 client_mon, NULL, &client_local_address, &client_remote_address);
219 if (event == ZMQ_EVENT_CONNECT_DELAYED) {
220 free (client_local_address);
221 free (client_remote_address);
222 event = get_monitor_event_v2 (client_mon, NULL, &client_local_address,
223 &client_remote_address);
224 }
225 TEST_ASSERT_EQUAL (ZMQ_EVENT_CONNECTED, event);
226 TEST_ASSERT_EQUAL_STRING (server_endpoint, client_remote_address);
227 TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, client_local_address,
228 strlen (expected_prefix_));
229 TEST_ASSERT_NOT_EQUAL (
230 0, strcmp (client_local_address, client_remote_address));
231
232 expect_monitor_event_v2 (client_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED,
233 client_local_address, client_remote_address);
234 event = get_monitor_event_v2 (client_mon, NULL, NULL, NULL);
235 if (event == ZMQ_EVENT_DISCONNECTED) {
236 expect_monitor_event_v2 (client_mon, ZMQ_EVENT_CONNECT_RETRIED,
237 client_local_address, client_remote_address);
238 expect_monitor_event_v2 (client_mon, ZMQ_EVENT_MONITOR_STOPPED, "", "");
239 } else
240 TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
241
242 // This is the flow of server events
243 expect_monitor_event_v2 (server_mon, ZMQ_EVENT_LISTENING,
244 client_remote_address, "");
245 expect_monitor_event_v2 (server_mon, ZMQ_EVENT_ACCEPTED,
246 client_remote_address, client_local_address);
247 expect_monitor_event_v2 (server_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED,
248 client_remote_address, client_local_address);
249 event = get_monitor_event_v2 (server_mon, NULL, NULL, NULL);
250 // Sometimes the server sees the client closing before it gets closed.
251 if (event != ZMQ_EVENT_DISCONNECTED) {
252 TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_CLOSED, event);
253 event = get_monitor_event_v2 (server_mon, NULL, NULL, NULL);
254 }
255 if (event != ZMQ_EVENT_DISCONNECTED) {
256 TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
257 }
258 free (client_local_address);
259 free (client_remote_address);
260
261 // Close down the sockets
262 // TODO why does this use zero_linger?
263 test_context_socket_close_zero_linger (client_mon);
264 test_context_socket_close_zero_linger (server_mon);
265}
266
267void test_monitor_versioned_basic_tcp_ipv4 ()
268{
269 static const char prefix[] = "tcp://127.0.0.1:";
270 test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PAIR);
271 test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PUB);
272 test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PUSH);
273}
274
275void test_monitor_versioned_basic_tcp_ipv6 ()
276{
277 static const char prefix[] = "tcp://[::1]:";
278 test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PAIR);
279 test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PUB);
280 test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PUSH);
281}
282
283void test_monitor_versioned_basic_ipc ()
284{
285 static const char prefix[] = "ipc://";
286 test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PAIR);
287 test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PUB);
288 test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PUSH);
289}
290
291void test_monitor_versioned_basic_tipc ()
292{
293 static const char prefix[] = "tipc://";
294 test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PAIR);
295 test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PUB);
296 test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PUSH);
297}
298
299#ifdef ZMQ_EVENT_PIPES_STATS
300void test_monitor_versioned_stats (bind_function_t bind_function_,
301 const char *expected_prefix_)
302{
303 char server_endpoint[MAX_SOCKET_STRING];
304 const int pulls_count = 4;
305 void *pulls[pulls_count];
306
307 // We'll monitor these two sockets
308 void *push = test_context_socket (ZMQ_PUSH);
309
310 TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
311 push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2, ZMQ_PAIR));
312
313 // Should fail if there are no pipes to monitor
314 TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_socket_monitor_pipes_stats (push));
315
316 void *push_mon = test_context_socket (ZMQ_PAIR);
317
318 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (push_mon, "inproc://monitor-push"));
319
320 // Set lower HWM - queues will be filled so we should see it in the stats
321 int send_hwm = 500;
322 TEST_ASSERT_SUCCESS_ERRNO (
323 zmq_setsockopt (push, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
324 // Set very low TCP buffers so that messages cannot be stored in-flight
325 const int tcp_buffer_size = 4096;
326 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
327 push, ZMQ_SNDBUF, &tcp_buffer_size, sizeof (tcp_buffer_size)));
328 bind_function_ (push, server_endpoint, sizeof (server_endpoint));
329
330 int ipv6;
331 size_t ipv6_size = sizeof (ipv6);
332 TEST_ASSERT_SUCCESS_ERRNO (
333 zmq_getsockopt (push, ZMQ_IPV6, &ipv6, &ipv6_size));
334 for (int i = 0; i < pulls_count; ++i) {
335 pulls[i] = test_context_socket (ZMQ_PULL);
336 TEST_ASSERT_SUCCESS_ERRNO (
337 zmq_setsockopt (pulls[i], ZMQ_IPV6, &ipv6, sizeof (int)));
338 int timeout_ms = 10;
339 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
340 pulls[i], ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
341 TEST_ASSERT_SUCCESS_ERRNO (
342 zmq_setsockopt (pulls[i], ZMQ_RCVHWM, &send_hwm, sizeof (send_hwm)));
343 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
344 pulls[i], ZMQ_RCVBUF, &tcp_buffer_size, sizeof (tcp_buffer_size)));
345 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[i], server_endpoint));
346 }
347
348 // Send until we block
349 int send_count = 0;
350 // Saturate the TCP buffers too
351 char data[tcp_buffer_size * 2];
352 memset (data, 0, sizeof (data));
353 // Saturate all pipes - send + receive - on all connections
354 while (send_count < send_hwm * 2 * pulls_count) {
355 TEST_ASSERT_EQUAL_INT (sizeof (data),
356 zmq_send (push, data, sizeof (data), 0));
357 ++send_count;
358 }
359
360 // Drain one of the pulls - doesn't matter how many messages, at least one
361 send_count = send_count / 4;
362 do {
363 zmq_recv (pulls[0], data, sizeof (data), 0);
364 --send_count;
365 } while (send_count > 0);
366
367 // To kick the application thread, do a dummy getsockopt - users here
368 // should use the monitor and the other sockets in a poll.
369 unsigned long int dummy;
370 size_t dummy_size = sizeof (dummy);
371 msleep (SETTLE_TIME);
372 // Note that the pipe stats on the sender will not get updated until the
373 // receiver has processed at least lwm ((hwm + 1) / 2) messages AND until
374 // the application thread has ran through the mailbox, as the update is
375 // delivered via a message (send_activate_write)
376 zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size);
377
378 // Ask for stats and check that they match
379 zmq_socket_monitor_pipes_stats (push);
380
381 msleep (SETTLE_TIME);
382 zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size);
383
384 for (int i = 0; i < pulls_count; ++i) {
385 char *push_local_address = NULL;
386 char *push_remote_address = NULL;
387 uint64_t queue_stat[2];
388 int64_t event = get_monitor_event_v2 (
389 push_mon, queue_stat, &push_local_address, &push_remote_address);
390 TEST_ASSERT_EQUAL_STRING (server_endpoint, push_local_address);
391 TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, push_remote_address,
392 strlen (expected_prefix_));
393 TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_PIPES_STATS, event);
394 TEST_ASSERT_EQUAL_INT (i == 0 ? 0 : send_hwm, queue_stat[0]);
395 TEST_ASSERT_EQUAL_INT (0, queue_stat[1]);
396 free (push_local_address);
397 free (push_remote_address);
398 }
399
400 // Close client and server
401 test_context_socket_close_zero_linger (push_mon);
402 test_context_socket_close_zero_linger (push);
403 for (int i = 0; i < pulls_count; ++i)
404 test_context_socket_close_zero_linger (pulls[i]);
405}
406
407void test_monitor_versioned_stats_tcp_ipv4 ()
408{
409 static const char prefix[] = "tcp://127.0.0.1:";
410 test_monitor_versioned_stats (bind_loopback_ipv4, prefix);
411}
412
413void test_monitor_versioned_stats_tcp_ipv6 ()
414{
415 static const char prefix[] = "tcp://[::1]:";
416 test_monitor_versioned_stats (bind_loopback_ipv6, prefix);
417}
418
419void test_monitor_versioned_stats_ipc ()
420{
421 static const char prefix[] = "ipc://";
422 test_monitor_versioned_stats (bind_loopback_ipc, prefix);
423}
424#endif // ZMQ_EVENT_PIPES_STATS
425#endif
426
427int main ()
428{
429 setup_test_environment ();
430
431 UNITY_BEGIN ();
432 RUN_TEST (test_monitor_invalid_protocol_fails);
433 RUN_TEST (test_monitor_basic);
434
435#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
436 || (defined ZMQ_CURRENT_EVENT_VERSION \
437 && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
438 RUN_TEST (test_monitor_versioned_invalid_socket_type);
439 RUN_TEST (test_monitor_versioned_basic_tcp_ipv4);
440 RUN_TEST (test_monitor_versioned_basic_tcp_ipv6);
441 RUN_TEST (test_monitor_versioned_basic_ipc);
442 RUN_TEST (test_monitor_versioned_basic_tipc);
443#ifdef ZMQ_EVENT_PIPES_STATS
444 RUN_TEST (test_monitor_versioned_stats_tcp_ipv4);
445 RUN_TEST (test_monitor_versioned_stats_tcp_ipv6);
446 RUN_TEST (test_monitor_versioned_stats_ipc);
447#endif
448#endif
449
450 return UNITY_END ();
451}
452