1/*
2 Copyright (c) 2018 Contributors as noted in the AUTHORS file
3
4 This file is part of 0MQ.
5
6 0MQ is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 0MQ is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License
17 along with this program. If not, see <http://www.gnu.org/licenses/>.
18*/
19
20#include "testutil.hpp"
21#include "testutil_unity.hpp"
22#if defined(ZMQ_HAVE_WINDOWS)
23#include <winsock2.h>
24#include <ws2tcpip.h>
25#include <stdexcept>
26#define close closesocket
27typedef SOCKET raw_socket;
28#else
29#include <arpa/inet.h>
30#include <unistd.h>
31typedef int raw_socket;
32#endif
33
34#include <stdlib.h>
35#include <string.h>
36
37SETUP_TEARDOWN_TESTCONTEXT
38
39// Read one event off the monitor socket; return value and address
40// by reference, if not null, and event number by value. Returns -1
41// in case of error.
42
43static int get_monitor_event (void *monitor_)
44{
45 for (int i = 0; i < 2; i++) {
46 // First frame in message contains event number and value
47 zmq_msg_t msg;
48 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
49 if (zmq_msg_recv (&msg, monitor_, ZMQ_DONTWAIT) == -1) {
50 msleep (SETTLE_TIME);
51 continue; // Interrupted, presumably
52 }
53 TEST_ASSERT_TRUE (zmq_msg_more (&msg));
54
55 uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
56 uint16_t event = *reinterpret_cast<uint16_t *> (data);
57
58 // Second frame in message contains event address
59 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
60 if (zmq_msg_recv (&msg, monitor_, 0) == -1) {
61 return -1; // Interrupted, presumably
62 }
63 TEST_ASSERT_FALSE (zmq_msg_more (&msg));
64
65 return event;
66 }
67 return -1;
68}
69
70static void recv_with_retry (raw_socket fd_, char *buffer_, int bytes_)
71{
72 int received = 0;
73 while (true) {
74 int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
75 recv (fd_, buffer_ + received, bytes_ - received, 0));
76 TEST_ASSERT_GREATER_THAN_INT (0, rc);
77 received += rc;
78 TEST_ASSERT_LESS_OR_EQUAL_INT (bytes_, received);
79 if (received == bytes_)
80 break;
81 }
82}
83
84static void mock_handshake (raw_socket fd_)
85{
86 const uint8_t zmtp_greeting[33] = {0xff, 0, 0, 0, 0, 0, 0, 0, 0,
87 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0};
88 char buffer[128];
89 memset (buffer, 0, sizeof (buffer));
90 memcpy (buffer, zmtp_greeting, sizeof (zmtp_greeting));
91
92 int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 64, 0));
93 TEST_ASSERT_EQUAL_INT (64, rc);
94
95 recv_with_retry (fd_, buffer, 64);
96
97 const uint8_t zmtp_ready[27] = {
98 4, 25, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e',
99 't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 3, 'S', 'U', 'B'};
100
101 memset (buffer, 0, sizeof (buffer));
102 memcpy (buffer, zmtp_ready, 27);
103 rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 27, 0));
104 TEST_ASSERT_EQUAL_INT (27, rc);
105
106 // greeting - XPUB so one extra byte
107 recv_with_retry (fd_, buffer, 28);
108}
109
110static void prep_server_socket (void **server_out_,
111 void **mon_out_,
112 char *endpoint_,
113 size_t ep_length_)
114{
115 // We'll be using this socket in raw mode
116 void *server = test_context_socket (ZMQ_XPUB);
117
118 int value = 0;
119 TEST_ASSERT_SUCCESS_ERRNO (
120 zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value)));
121
122 bind_loopback_ipv4 (server, endpoint_, ep_length_);
123
124 // Create and connect a socket for collecting monitor events on xpub
125 void *server_mon = test_context_socket (ZMQ_PAIR);
126
127 TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor (
128 server, "inproc://monitor-dealer",
129 ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_ACCEPTED));
130
131 // Connect to the inproc endpoint so we'll get events
132 TEST_ASSERT_SUCCESS_ERRNO (
133 zmq_connect (server_mon, "inproc://monitor-dealer"));
134
135 *server_out_ = server;
136 *mon_out_ = server_mon;
137}
138
139static void test_mock_sub (bool sub_command_)
140{
141 int rc;
142 char my_endpoint[MAX_SOCKET_STRING];
143
144 void *server, *server_mon;
145 prep_server_socket (&server, &server_mon, my_endpoint, MAX_SOCKET_STRING);
146
147 struct sockaddr_in ip4addr;
148 raw_socket s;
149
150 ip4addr.sin_family = AF_INET;
151 ip4addr.sin_port = htons (atoi (strrchr (my_endpoint, ':') + 1));
152#if defined(ZMQ_HAVE_WINDOWS) && (_WIN32_WINNT < 0x0600)
153 ip4addr.sin_addr.s_addr = inet_addr ("127.0.0.1");
154#else
155 inet_pton (AF_INET, "127.0.0.1", &ip4addr.sin_addr);
156#endif
157
158 s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
159 rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
160 connect (s, (struct sockaddr *) &ip4addr, sizeof ip4addr));
161 TEST_ASSERT_GREATER_THAN_INT (-1, rc);
162
163 // Mock a ZMTP 3 client so we can forcibly try sub commands
164 mock_handshake (s);
165
166 // By now everything should report as connected
167 rc = get_monitor_event (server_mon);
168 TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc);
169
170 if (sub_command_) {
171 const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S',
172 'C', 'R', 'I', 'B', 'E', 'A'};
173 rc =
174 TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 13, 0));
175 TEST_ASSERT_EQUAL_INT (13, rc);
176 } else {
177 const uint8_t sub[4] = {0, 2, 1, 'A'};
178 rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 4, 0));
179 TEST_ASSERT_EQUAL_INT (4, rc);
180 }
181
182 char buffer[16];
183 memset (buffer, 0, sizeof (buffer));
184 rc = zmq_recv (server, buffer, 2, 0);
185 TEST_ASSERT_EQUAL_INT (2, rc);
186 TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2));
187
188 rc = zmq_send (server, "ALOL", 4, 0);
189 TEST_ASSERT_EQUAL_INT (4, rc);
190
191 memset (buffer, 0, sizeof (buffer));
192 recv_with_retry (s, buffer, 6);
193 TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6));
194
195 close (s);
196
197 test_context_socket_close (server);
198 test_context_socket_close (server_mon);
199}
200
201void test_mock_sub_command ()
202{
203 test_mock_sub (true);
204}
205
206void test_mock_sub_legacy ()
207{
208 test_mock_sub (false);
209}
210
211int main (void)
212{
213 setup_test_environment ();
214
215 UNITY_BEGIN ();
216
217 RUN_TEST (test_mock_sub_command);
218 RUN_TEST (test_mock_sub_legacy);
219
220 return UNITY_END ();
221}
222