1 | /* |
2 | Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of 0MQ. |
5 | |
6 | 0MQ is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License as published by |
8 | the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | 0MQ is distributed in the hope that it will be useful, |
12 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | GNU Lesser General Public License for more details. |
15 | |
16 | You should have received a copy of the GNU Lesser General Public License |
17 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | */ |
19 | |
20 | #include "testutil.hpp" |
21 | #if defined(ZMQ_HAVE_WINDOWS) |
22 | #include <winsock2.h> |
23 | #include <ws2tcpip.h> |
24 | #include <stdexcept> |
25 | #define close closesocket |
26 | typedef SOCKET raw_socket; |
27 | #else |
28 | #include <arpa/inet.h> |
29 | #include <unistd.h> |
30 | typedef int raw_socket; |
31 | #endif |
32 | |
33 | #include <limits.h> |
34 | #include <stdlib.h> |
35 | #include <string.h> |
36 | |
37 | // TODO remove this here, either ensure that UINT16_MAX is always properly |
38 | // defined or handle this at a more central location |
39 | #ifndef UINT16_MAX |
40 | #define UINT16_MAX 65535 |
41 | #endif |
42 | |
43 | #include "testutil_unity.hpp" |
44 | |
45 | SETUP_TEARDOWN_TESTCONTEXT |
46 | |
47 | // Read one event off the monitor socket; return value and address |
48 | // by reference, if not null, and event number by value. Returns -1 |
49 | // in case of error. |
50 | |
51 | static int get_monitor_event (void *monitor_) |
52 | { |
53 | for (int i = 0; i < 2; i++) { |
54 | // First frame in message contains event number and value |
55 | zmq_msg_t msg; |
56 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); |
57 | if (zmq_msg_recv (&msg, monitor_, ZMQ_DONTWAIT) == -1) { |
58 | msleep (SETTLE_TIME); |
59 | continue; // Interrupted, presumably |
60 | } |
61 | TEST_ASSERT_TRUE (zmq_msg_more (&msg)); |
62 | |
63 | uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg)); |
64 | uint16_t event = *reinterpret_cast<uint16_t *> (data); |
65 | |
66 | // Second frame in message contains event address |
67 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); |
68 | if (zmq_msg_recv (&msg, monitor_, 0) == -1) { |
69 | return -1; // Interrupted, presumably |
70 | } |
71 | TEST_ASSERT_FALSE (zmq_msg_more (&msg)); |
72 | |
73 | return event; |
74 | } |
75 | return -1; |
76 | } |
77 | |
78 | static void recv_with_retry (raw_socket fd_, char *buffer_, int bytes_) |
79 | { |
80 | int received = 0; |
81 | while (true) { |
82 | int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( |
83 | recv (fd_, buffer_ + received, bytes_ - received, 0)); |
84 | TEST_ASSERT_GREATER_THAN_INT (0, rc); |
85 | received += rc; |
86 | TEST_ASSERT_LESS_OR_EQUAL_INT (bytes_, received); |
87 | if (received == bytes_) |
88 | break; |
89 | } |
90 | } |
91 | |
92 | static void mock_handshake (raw_socket fd_, int mock_ping_) |
93 | { |
94 | const uint8_t zmtp_greeting[33] = {0xff, 0, 0, 0, 0, 0, 0, 0, 0, |
95 | 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0}; |
96 | char buffer[128]; |
97 | memset (buffer, 0, sizeof (buffer)); |
98 | memcpy (buffer, zmtp_greeting, sizeof (zmtp_greeting)); |
99 | |
100 | int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 64, 0)); |
101 | TEST_ASSERT_EQUAL_INT (64, rc); |
102 | |
103 | recv_with_retry (fd_, buffer, 64); |
104 | |
105 | const uint8_t zmtp_ready[43] = { |
106 | 4, 41, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', 't', |
107 | '-', 'T', 'y', 'p', 'e', 0, 0, 0, 6, 'D', 'E', 'A', 'L', 'E', 'R', |
108 | 8, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y', 0, 0, 0, 0}; |
109 | |
110 | memset (buffer, 0, sizeof (buffer)); |
111 | memcpy (buffer, zmtp_ready, 43); |
112 | rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 43, 0)); |
113 | TEST_ASSERT_EQUAL_INT (43, rc); |
114 | |
115 | // greeting |
116 | recv_with_retry (fd_, buffer, 43); |
117 | |
118 | if (mock_ping_) { |
119 | // test PING context - should be replicated in the PONG |
120 | // to avoid timeouts, do a bulk send |
121 | const uint8_t zmtp_ping[12] = {4, 10, 4, 'P', 'I', 'N', |
122 | 'G', 0, 0, 'L', 'O', 'L'}; |
123 | uint8_t zmtp_pong[10] = {4, 8, 4, 'P', 'O', 'N', 'G', 'L', 'O', 'L'}; |
124 | memset (buffer, 0, sizeof (buffer)); |
125 | memcpy (buffer, zmtp_ping, 12); |
126 | rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 12, 0)); |
127 | TEST_ASSERT_EQUAL_INT (12, rc); |
128 | |
129 | // test a larger body that won't fit in a small message and should get |
130 | // truncated |
131 | memset (buffer, 'z', sizeof (buffer)); |
132 | memcpy (buffer, zmtp_ping, 12); |
133 | buffer[1] = 65; |
134 | rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 67, 0)); |
135 | TEST_ASSERT_EQUAL_INT (67, rc); |
136 | |
137 | // small pong |
138 | recv_with_retry (fd_, buffer, 10); |
139 | TEST_ASSERT_EQUAL_INT (0, memcmp (zmtp_pong, buffer, 10)); |
140 | // large pong |
141 | recv_with_retry (fd_, buffer, 23); |
142 | uint8_t zmtp_pooong[65] = {4, 21, 4, 'P', 'O', 'N', 'G', 'L', 'O', 'L'}; |
143 | memset (zmtp_pooong + 10, 'z', 55); |
144 | TEST_ASSERT_EQUAL_INT (0, memcmp (zmtp_pooong, buffer, 23)); |
145 | } |
146 | } |
147 | |
148 | static void setup_curve (void *socket_, int is_server_) |
149 | { |
150 | const char *secret_key; |
151 | const char *public_key; |
152 | const char *server_key; |
153 | |
154 | if (is_server_) { |
155 | secret_key = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6" ; |
156 | public_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7" ; |
157 | server_key = NULL; |
158 | } else { |
159 | secret_key = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs" ; |
160 | public_key = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID" ; |
161 | server_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7" ; |
162 | } |
163 | |
164 | zmq_setsockopt (socket_, ZMQ_CURVE_SECRETKEY, secret_key, |
165 | strlen (secret_key)); |
166 | zmq_setsockopt (socket_, ZMQ_CURVE_PUBLICKEY, public_key, |
167 | strlen (public_key)); |
168 | if (is_server_) |
169 | zmq_setsockopt (socket_, ZMQ_CURVE_SERVER, &is_server_, |
170 | sizeof (is_server_)); |
171 | else |
172 | zmq_setsockopt (socket_, ZMQ_CURVE_SERVERKEY, server_key, |
173 | strlen (server_key)); |
174 | } |
175 | |
176 | static void prep_server_socket (int set_heartbeats_, |
177 | int is_curve_, |
178 | void **server_out_, |
179 | void **mon_out_, |
180 | char *endpoint_, |
181 | size_t ep_length_, |
182 | int socket_type_) |
183 | { |
184 | // We'll be using this socket in raw mode |
185 | void *server = test_context_socket (socket_type_); |
186 | |
187 | int value = 0; |
188 | TEST_ASSERT_SUCCESS_ERRNO ( |
189 | zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value))); |
190 | |
191 | if (set_heartbeats_) { |
192 | value = 50; |
193 | TEST_ASSERT_SUCCESS_ERRNO ( |
194 | zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof (value))); |
195 | } |
196 | |
197 | if (is_curve_) |
198 | setup_curve (server, 1); |
199 | |
200 | bind_loopback_ipv4 (server, endpoint_, ep_length_); |
201 | |
202 | // Create and connect a socket for collecting monitor events on dealer |
203 | void *server_mon = test_context_socket (ZMQ_PAIR); |
204 | |
205 | TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor ( |
206 | server, "inproc://monitor-dealer" , |
207 | ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_ACCEPTED)); |
208 | |
209 | // Connect to the inproc endpoint so we'll get events |
210 | TEST_ASSERT_SUCCESS_ERRNO ( |
211 | zmq_connect (server_mon, "inproc://monitor-dealer" )); |
212 | |
213 | *server_out_ = server; |
214 | *mon_out_ = server_mon; |
215 | } |
216 | |
217 | // This checks for a broken TCP connection (or, in this case a stuck one |
218 | // where the peer never responds to PINGS). There should be an accepted event |
219 | // then a disconnect event. |
220 | static void test_heartbeat_timeout (int server_type_, int mock_ping_) |
221 | { |
222 | int rc; |
223 | char my_endpoint[MAX_SOCKET_STRING]; |
224 | |
225 | void *server, *server_mon; |
226 | prep_server_socket (!mock_ping_, 0, &server, &server_mon, my_endpoint, |
227 | MAX_SOCKET_STRING, server_type_); |
228 | |
229 | struct sockaddr_in ip4addr; |
230 | raw_socket s; |
231 | |
232 | ip4addr.sin_family = AF_INET; |
233 | ip4addr.sin_port = htons (atoi (strrchr (my_endpoint, ':') + 1)); |
234 | #if defined(ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600) |
235 | ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1" ); |
236 | #else |
237 | inet_pton (AF_INET, "127.0.0.1" , &ip4addr.sin_addr); |
238 | #endif |
239 | |
240 | s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); |
241 | rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( |
242 | connect (s, (struct sockaddr *) &ip4addr, sizeof ip4addr)); |
243 | TEST_ASSERT_GREATER_THAN_INT (-1, rc); |
244 | |
245 | // Mock a ZMTP 3 client so we can forcibly time out a connection |
246 | mock_handshake (s, mock_ping_); |
247 | |
248 | // By now everything should report as connected |
249 | rc = get_monitor_event (server_mon); |
250 | TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc); |
251 | |
252 | if (!mock_ping_) { |
253 | // We should have been disconnected |
254 | rc = get_monitor_event (server_mon); |
255 | TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_DISCONNECTED, rc); |
256 | } |
257 | |
258 | close (s); |
259 | |
260 | test_context_socket_close (server); |
261 | test_context_socket_close (server_mon); |
262 | } |
263 | |
264 | // This checks that peers respect the TTL value in ping messages |
265 | // We set up a mock ZMTP 3 client and send a ping message with a TLL |
266 | // to a server that is not doing any heartbeating. Then we sleep, |
267 | // if the server disconnects the client, then we know the TTL did |
268 | // its thing correctly. |
269 | static void test_heartbeat_ttl (int client_type_, int server_type_) |
270 | { |
271 | int rc, value; |
272 | char my_endpoint[MAX_SOCKET_STRING]; |
273 | |
274 | void *server, *server_mon, *client; |
275 | prep_server_socket (0, 0, &server, &server_mon, my_endpoint, |
276 | MAX_SOCKET_STRING, server_type_); |
277 | |
278 | client = test_context_socket (client_type_); |
279 | |
280 | // Set the heartbeat TTL to 0.1 seconds |
281 | value = 100; |
282 | TEST_ASSERT_SUCCESS_ERRNO ( |
283 | zmq_setsockopt (client, ZMQ_HEARTBEAT_TTL, &value, sizeof (value))); |
284 | |
285 | // Set the heartbeat interval to much longer than the TTL so that |
286 | // the socket times out oon the remote side. |
287 | value = 250; |
288 | TEST_ASSERT_SUCCESS_ERRNO ( |
289 | zmq_setsockopt (client, ZMQ_HEARTBEAT_IVL, &value, sizeof (value))); |
290 | |
291 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint)); |
292 | |
293 | // By now everything should report as connected |
294 | rc = get_monitor_event (server_mon); |
295 | TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc); |
296 | |
297 | msleep (SETTLE_TIME); |
298 | |
299 | // We should have been disconnected |
300 | rc = get_monitor_event (server_mon); |
301 | TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_DISCONNECTED, rc); |
302 | |
303 | test_context_socket_close (server); |
304 | test_context_socket_close (server_mon); |
305 | test_context_socket_close (client); |
306 | } |
307 | |
308 | // This checks for normal operation - that is pings and pongs being |
309 | // exchanged normally. There should be an accepted event on the server, |
310 | // and then no event afterwards. |
311 | static void |
312 | test_heartbeat_notimeout (int is_curve_, int client_type_, int server_type_) |
313 | { |
314 | int rc; |
315 | char my_endpoint[MAX_SOCKET_STRING]; |
316 | |
317 | void *server, *server_mon; |
318 | prep_server_socket (1, is_curve_, &server, &server_mon, my_endpoint, |
319 | MAX_SOCKET_STRING, server_type_); |
320 | |
321 | void *client = test_context_socket (client_type_); |
322 | if (is_curve_) |
323 | setup_curve (client, 0); |
324 | rc = zmq_connect (client, my_endpoint); |
325 | |
326 | // Give it a sec to connect and handshake |
327 | msleep (SETTLE_TIME); |
328 | |
329 | // By now everything should report as connected |
330 | rc = get_monitor_event (server_mon); |
331 | TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc); |
332 | |
333 | // We should still be connected because pings and pongs are happenin' |
334 | TEST_ASSERT_EQUAL_INT (-1, get_monitor_event (server_mon)); |
335 | |
336 | test_context_socket_close (client); |
337 | test_context_socket_close (server); |
338 | test_context_socket_close (server_mon); |
339 | } |
340 | |
341 | void test_heartbeat_timeout_router () |
342 | { |
343 | test_heartbeat_timeout (ZMQ_ROUTER, 0); |
344 | } |
345 | |
346 | void test_heartbeat_timeout_router_mock_ping () |
347 | { |
348 | test_heartbeat_timeout (ZMQ_ROUTER, 1); |
349 | } |
350 | |
351 | #define DEFINE_TESTS(first, second, first_define, second_define) \ |
352 | void test_heartbeat_ttl_##first##_##second () \ |
353 | { \ |
354 | test_heartbeat_ttl (first_define, second_define); \ |
355 | } \ |
356 | void test_heartbeat_notimeout_##first##_##second () \ |
357 | { \ |
358 | test_heartbeat_notimeout (0, first_define, second_define); \ |
359 | } \ |
360 | void test_heartbeat_notimeout_##first##_##second##_with_curve () \ |
361 | { \ |
362 | test_heartbeat_notimeout (1, first_define, second_define); \ |
363 | } |
364 | |
365 | DEFINE_TESTS (dealer, router, ZMQ_DEALER, ZMQ_ROUTER) |
366 | DEFINE_TESTS (req, rep, ZMQ_REQ, ZMQ_REP) |
367 | DEFINE_TESTS (pull, push, ZMQ_PULL, ZMQ_PUSH) |
368 | DEFINE_TESTS (sub, pub, ZMQ_SUB, ZMQ_PUB) |
369 | DEFINE_TESTS (pair, pair, ZMQ_PAIR, ZMQ_PAIR) |
370 | |
371 | #ifdef ZMQ_BUILD_DRAFT_API |
372 | DEFINE_TESTS (gather, scatter, ZMQ_GATHER, ZMQ_SCATTER) |
373 | DEFINE_TESTS (client, server, ZMQ_CLIENT, ZMQ_SERVER) |
374 | #endif |
375 | |
376 | const int deciseconds_per_millisecond = 100; |
377 | const int heartbeat_ttl_max = |
378 | (UINT16_MAX + 1) * deciseconds_per_millisecond - 1; |
379 | |
380 | void test_setsockopt_heartbeat_success (const int value_) |
381 | { |
382 | void *const socket = test_context_socket (ZMQ_PAIR); |
383 | TEST_ASSERT_SUCCESS_ERRNO ( |
384 | zmq_setsockopt (socket, ZMQ_HEARTBEAT_TTL, &value_, sizeof (value_))); |
385 | |
386 | int value_read; |
387 | size_t value_read_size = sizeof (value_read); |
388 | TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt (socket, ZMQ_HEARTBEAT_TTL, |
389 | &value_read, &value_read_size)); |
390 | |
391 | TEST_ASSERT_EQUAL_INT (value_ - value_ % deciseconds_per_millisecond, |
392 | value_read); |
393 | |
394 | test_context_socket_close (socket); |
395 | } |
396 | |
397 | void test_setsockopt_heartbeat_ttl_max () |
398 | { |
399 | test_setsockopt_heartbeat_success (heartbeat_ttl_max); |
400 | } |
401 | |
402 | void test_setsockopt_heartbeat_ttl_more_than_max_fails () |
403 | { |
404 | void *const socket = test_context_socket (ZMQ_PAIR); |
405 | const int value = heartbeat_ttl_max + 1; |
406 | TEST_ASSERT_FAILURE_ERRNO ( |
407 | EINVAL, |
408 | zmq_setsockopt (socket, ZMQ_HEARTBEAT_TTL, &value, sizeof (value))); |
409 | |
410 | test_context_socket_close (socket); |
411 | } |
412 | |
413 | void test_setsockopt_heartbeat_ttl_near_zero () |
414 | { |
415 | test_setsockopt_heartbeat_success (deciseconds_per_millisecond - 1); |
416 | } |
417 | |
418 | int main (void) |
419 | { |
420 | setup_test_environment (); |
421 | |
422 | UNITY_BEGIN (); |
423 | |
424 | RUN_TEST (test_heartbeat_timeout_router); |
425 | RUN_TEST (test_heartbeat_timeout_router_mock_ping); |
426 | |
427 | RUN_TEST (test_heartbeat_ttl_dealer_router); |
428 | RUN_TEST (test_heartbeat_ttl_req_rep); |
429 | RUN_TEST (test_heartbeat_ttl_pull_push); |
430 | RUN_TEST (test_heartbeat_ttl_sub_pub); |
431 | RUN_TEST (test_heartbeat_ttl_pair_pair); |
432 | |
433 | RUN_TEST (test_setsockopt_heartbeat_ttl_max); |
434 | RUN_TEST (test_setsockopt_heartbeat_ttl_more_than_max_fails); |
435 | RUN_TEST (test_setsockopt_heartbeat_ttl_near_zero); |
436 | |
437 | RUN_TEST (test_heartbeat_notimeout_dealer_router); |
438 | RUN_TEST (test_heartbeat_notimeout_req_rep); |
439 | RUN_TEST (test_heartbeat_notimeout_pull_push); |
440 | RUN_TEST (test_heartbeat_notimeout_sub_pub); |
441 | RUN_TEST (test_heartbeat_notimeout_pair_pair); |
442 | |
443 | RUN_TEST (test_heartbeat_notimeout_dealer_router_with_curve); |
444 | RUN_TEST (test_heartbeat_notimeout_req_rep_with_curve); |
445 | RUN_TEST (test_heartbeat_notimeout_pull_push_with_curve); |
446 | RUN_TEST (test_heartbeat_notimeout_sub_pub_with_curve); |
447 | RUN_TEST (test_heartbeat_notimeout_pair_pair_with_curve); |
448 | |
449 | #ifdef ZMQ_BUILD_DRAFT_API |
450 | RUN_TEST (test_heartbeat_ttl_client_server); |
451 | RUN_TEST (test_heartbeat_ttl_gather_scatter); |
452 | |
453 | RUN_TEST (test_heartbeat_notimeout_client_server); |
454 | RUN_TEST (test_heartbeat_notimeout_gather_scatter); |
455 | |
456 | RUN_TEST (test_heartbeat_notimeout_client_server_with_curve); |
457 | RUN_TEST (test_heartbeat_notimeout_gather_scatter_with_curve); |
458 | #endif |
459 | |
460 | return UNITY_END (); |
461 | } |
462 | |