1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33#include <string.h>
34
35SETUP_TEARDOWN_TESTCONTEXT
36
37// ZMTP protocol greeting structure
38
39typedef uint8_t byte;
40typedef struct
41{
42 byte signature[10]; // 0xFF 8*0x00 0x7F
43 byte version[2]; // 0x03 0x00 for ZMTP/3.0
44 byte mechanism[20]; // "NULL"
45 byte as_server;
46 byte filler[31];
47} zmtp_greeting_t;
48
49#define ZMTP_DEALER 5 // Socket type constants
50
51// This is a greeting matching what 0MQ will send us; note the
52// 8-byte size is set to 1 for backwards compatibility
53
54static zmtp_greeting_t greeting = {
55 {0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F}, {3, 0}, {'N', 'U', 'L', 'L'}, 0, {0}};
56
57static void test_stream_to_dealer ()
58{
59 int rc;
60 char my_endpoint[MAX_SOCKET_STRING];
61
62 // We'll be using this socket in raw mode
63 void *stream = test_context_socket (ZMQ_STREAM);
64
65 int zero = 0;
66 TEST_ASSERT_SUCCESS_ERRNO (
67 zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero)));
68 int enabled = 1;
69 TEST_ASSERT_SUCCESS_ERRNO (
70 zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
71 bind_loopback_ipv4 (stream, my_endpoint, sizeof my_endpoint);
72
73 // We'll be using this socket as the other peer
74 void *dealer = test_context_socket (ZMQ_DEALER);
75 TEST_ASSERT_SUCCESS_ERRNO (
76 zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero)));
77 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint));
78
79 // Send a message on the dealer socket
80 send_string_expect_success (dealer, "Hello", 0);
81
82 // Connecting sends a zero message
83 // First frame is routing id
84 zmq_msg_t routing_id;
85 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&routing_id));
86 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0));
87 TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
88
89 // Verify the existence of Peer-Address metadata
90 char const *peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
91 TEST_ASSERT_NOT_NULL (peer_address);
92 TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
93
94 // Second frame is zero
95 byte buffer[255];
96 TEST_ASSERT_EQUAL_INT (
97 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (stream, buffer, 255, 0)));
98
99 // Verify the existence of Peer-Address metadata
100 peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
101 TEST_ASSERT_NOT_NULL (peer_address);
102 TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
103
104 // Real data follows
105 // First frame is routing id
106 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0));
107 TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
108
109 // Verify the existence of Peer-Address metadata
110 peer_address = zmq_msg_gets (&routing_id, "Peer-Address");
111 TEST_ASSERT_NOT_NULL (peer_address);
112 TEST_ASSERT_EQUAL_STRING ("127.0.0.1", peer_address);
113
114 // Second frame is greeting signature
115 recv_array_expect_success (stream, greeting.signature, 0);
116
117 // Send our own protocol greeting
118 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE));
119 TEST_ASSERT_EQUAL_INT (
120 sizeof (greeting), TEST_ASSERT_SUCCESS_ERRNO (
121 zmq_send (stream, &greeting, sizeof (greeting), 0)));
122
123 // Now we expect the data from the DEALER socket
124 // We want the rest of greeting along with the Ready command
125 int bytes_read = 0;
126 while (bytes_read < 97) {
127 // First frame is the routing id of the connection (each time)
128 TEST_ASSERT_GREATER_THAN_INT (
129 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)));
130 TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
131 // Second frame contains the next chunk of data
132 TEST_ASSERT_SUCCESS_ERRNO (
133 rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0));
134 bytes_read += rc;
135 }
136
137 // First two bytes are major and minor version numbers.
138 TEST_ASSERT_EQUAL_INT (3, buffer[0]); // ZMTP/3.0
139 TEST_ASSERT_EQUAL_INT (0, buffer[1]);
140
141 // Mechanism is "NULL"
142 TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2,
143 "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20);
144 TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 54, "\4\51\5READY", 8);
145 TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 62, "\13Socket-Type\0\0\0\6DEALER",
146 22);
147 TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 84, "\10Identity\0\0\0\0", 13);
148
149 // Announce we are ready
150 memcpy (buffer, "\4\51\5READY", 8);
151 memcpy (buffer + 8, "\13Socket-Type\0\0\0\6ROUTER", 22);
152 memcpy (buffer + 30, "\10Identity\0\0\0\0", 13);
153
154 // Send Ready command
155 TEST_ASSERT_GREATER_THAN_INT (0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (
156 &routing_id, stream, ZMQ_SNDMORE)));
157 TEST_ASSERT_EQUAL_INT (
158 43, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, buffer, 43, 0)));
159
160 // Now we expect the data from the DEALER socket
161 // First frame is, again, the routing id of the connection
162 TEST_ASSERT_GREATER_THAN_INT (
163 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)));
164 TEST_ASSERT_TRUE (zmq_msg_more (&routing_id));
165
166 // Third frame contains Hello message from DEALER
167 TEST_ASSERT_EQUAL_INT (7, TEST_ASSERT_SUCCESS_ERRNO (
168 zmq_recv (stream, buffer, sizeof buffer, 0)));
169
170 // Then we have a 5-byte message "Hello"
171 TEST_ASSERT_EQUAL_INT (0, buffer[0]); // Flags = 0
172 TEST_ASSERT_EQUAL_INT (5, buffer[1]); // Size = 5
173 TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2, "Hello", 5);
174
175 // Send "World" back to DEALER
176 TEST_ASSERT_GREATER_THAN_INT (0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (
177 &routing_id, stream, ZMQ_SNDMORE)));
178 byte world[] = {0, 5, 'W', 'o', 'r', 'l', 'd'};
179 TEST_ASSERT_EQUAL_INT (
180 sizeof (world),
181 TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, world, sizeof (world), 0)));
182
183 // Expect response on DEALER socket
184 recv_string_expect_success (dealer, "World", 0);
185
186 // Test large messages over STREAM socket
187#define size 64000
188 uint8_t msgout[size];
189 memset (msgout, 0xAB, size);
190 zmq_send (dealer, msgout, size, 0);
191
192 uint8_t msgin[9 + size];
193 memset (msgin, 0, 9 + size);
194 bytes_read = 0;
195 while (bytes_read < 9 + size) {
196 // Get routing id frame
197 TEST_ASSERT_GREATER_THAN_INT (
198 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (stream, buffer, 256, 0)));
199 // Get next chunk
200 TEST_ASSERT_GREATER_THAN_INT (
201 0,
202 TEST_ASSERT_SUCCESS_ERRNO (rc = zmq_recv (stream, msgin + bytes_read,
203 9 + size - bytes_read, 0)));
204 bytes_read += rc;
205 }
206 for (int byte_nbr = 0; byte_nbr < size; byte_nbr++) {
207 TEST_ASSERT_EQUAL_UINT8 (0xAB, msgin[9 + byte_nbr]);
208 }
209 test_context_socket_close (dealer);
210 test_context_socket_close (stream);
211}
212
213
214static void test_stream_to_stream ()
215{
216 char my_endpoint[MAX_SOCKET_STRING];
217 // Set-up our context and sockets
218
219 void *server = test_context_socket (ZMQ_STREAM);
220 int enabled = 1;
221 TEST_ASSERT_SUCCESS_ERRNO (
222 zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
223 bind_loopback_ipv4 (server, my_endpoint, sizeof my_endpoint);
224
225 void *client = test_context_socket (ZMQ_STREAM);
226 TEST_ASSERT_SUCCESS_ERRNO (
227 zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
228 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
229 uint8_t id[256];
230 uint8_t buffer[256];
231
232 // Connecting sends a zero message
233 // Server: First frame is routing id, second frame is zero
234 TEST_ASSERT_GREATER_THAN_INT (
235 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0)));
236 TEST_ASSERT_EQUAL_INT (
237 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, buffer, 256, 0)));
238 // Client: First frame is routing id, second frame is zero
239 TEST_ASSERT_GREATER_THAN_INT (
240 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0)));
241 TEST_ASSERT_EQUAL_INT (
242 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, buffer, 256, 0)));
243
244 // Sent HTTP request on client socket
245 // Get server routing id
246 size_t id_size = sizeof id;
247 TEST_ASSERT_SUCCESS_ERRNO (
248 zmq_getsockopt (client, ZMQ_ROUTING_ID, id, &id_size));
249 // First frame is server routing id
250 TEST_ASSERT_EQUAL_INT ((int) id_size, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (
251 client, id, id_size, ZMQ_SNDMORE)));
252 // Second frame is HTTP GET request
253 TEST_ASSERT_EQUAL_INT (
254 7, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (client, "GET /\n\n", 7, 0)));
255
256 // Get HTTP request; ID frame and then request
257 TEST_ASSERT_GREATER_THAN_INT (
258 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0)));
259 TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, buffer, 256, 0));
260 TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, "GET /\n\n", 7);
261
262 // Send reply back to client
263 char http_response[] = "HTTP/1.0 200 OK\r\n"
264 "Content-Type: text/plain\r\n"
265 "\r\n"
266 "Hello, World!";
267 TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, id, id_size, ZMQ_SNDMORE));
268 TEST_ASSERT_SUCCESS_ERRNO (
269 zmq_send (server, http_response, sizeof (http_response), ZMQ_SNDMORE));
270
271 // Send zero to close connection to client
272 TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, id, id_size, ZMQ_SNDMORE));
273 TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, NULL, 0, ZMQ_SNDMORE));
274
275 // Get reply at client and check that it's complete
276 TEST_ASSERT_GREATER_THAN_INT (
277 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0)));
278 TEST_ASSERT_EQUAL_INT (
279 sizeof http_response,
280 TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, buffer, 256, 0)));
281 TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, http_response,
282 sizeof (http_response));
283
284 // // Get disconnection notification
285 // FIXME: why does this block? Bug in STREAM disconnect notification?
286 // id_size = zmq_recv (client, id, 256, 0);
287 // assert (id_size > 0);
288 // rc = zmq_recv (client, buffer, 256, 0);
289 // assert (rc == 0);
290
291 test_context_socket_close (server);
292 test_context_socket_close (client);
293}
294
295int main ()
296{
297 setup_test_environment ();
298
299 UNITY_BEGIN ();
300 RUN_TEST (test_stream_to_dealer);
301 RUN_TEST (test_stream_to_stream);
302 return UNITY_END ();
303}
304