1 | /* |
2 | Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "testutil.hpp" |
31 | #include "testutil_unity.hpp" |
32 | |
33 | #include <string.h> |
34 | |
35 | SETUP_TEARDOWN_TESTCONTEXT |
36 | |
37 | // ZMTP protocol greeting structure |
38 | |
39 | typedef uint8_t byte; |
40 | typedef struct |
41 | { |
42 | byte signature[10]; // 0xFF 8*0x00 0x7F |
43 | byte version[2]; // 0x03 0x00 for ZMTP/3.0 |
44 | byte mechanism[20]; // "NULL" |
45 | byte as_server; |
46 | byte filler[31]; |
47 | } zmtp_greeting_t; |
48 | |
49 | #define ZMTP_DEALER 5 // Socket type constants |
50 | |
51 | // This is a greeting matching what 0MQ will send us; note the |
52 | // 8-byte size is set to 1 for backwards compatibility |
53 | |
54 | static zmtp_greeting_t greeting = { |
55 | {0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F}, {3, 0}, {'N', 'U', 'L', 'L'}, 0, {0}}; |
56 | |
57 | static void test_stream_to_dealer () |
58 | { |
59 | int rc; |
60 | char my_endpoint[MAX_SOCKET_STRING]; |
61 | |
62 | // We'll be using this socket in raw mode |
63 | void *stream = test_context_socket (ZMQ_STREAM); |
64 | |
65 | int zero = 0; |
66 | TEST_ASSERT_SUCCESS_ERRNO ( |
67 | zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero))); |
68 | int enabled = 1; |
69 | TEST_ASSERT_SUCCESS_ERRNO ( |
70 | zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled))); |
71 | bind_loopback_ipv4 (stream, my_endpoint, sizeof my_endpoint); |
72 | |
73 | // We'll be using this socket as the other peer |
74 | void *dealer = test_context_socket (ZMQ_DEALER); |
75 | TEST_ASSERT_SUCCESS_ERRNO ( |
76 | zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero))); |
77 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint)); |
78 | |
79 | // Send a message on the dealer socket |
80 | send_string_expect_success (dealer, "Hello" , 0); |
81 | |
82 | // Connecting sends a zero message |
83 | // First frame is routing id |
84 | zmq_msg_t routing_id; |
85 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&routing_id)); |
86 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)); |
87 | TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); |
88 | |
89 | // Verify the existence of Peer-Address metadata |
90 | char const *peer_address = zmq_msg_gets (&routing_id, "Peer-Address" ); |
91 | TEST_ASSERT_NOT_NULL (peer_address); |
92 | TEST_ASSERT_EQUAL_STRING ("127.0.0.1" , peer_address); |
93 | |
94 | // Second frame is zero |
95 | byte buffer[255]; |
96 | TEST_ASSERT_EQUAL_INT ( |
97 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (stream, buffer, 255, 0))); |
98 | |
99 | // Verify the existence of Peer-Address metadata |
100 | peer_address = zmq_msg_gets (&routing_id, "Peer-Address" ); |
101 | TEST_ASSERT_NOT_NULL (peer_address); |
102 | TEST_ASSERT_EQUAL_STRING ("127.0.0.1" , peer_address); |
103 | |
104 | // Real data follows |
105 | // First frame is routing id |
106 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)); |
107 | TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); |
108 | |
109 | // Verify the existence of Peer-Address metadata |
110 | peer_address = zmq_msg_gets (&routing_id, "Peer-Address" ); |
111 | TEST_ASSERT_NOT_NULL (peer_address); |
112 | TEST_ASSERT_EQUAL_STRING ("127.0.0.1" , peer_address); |
113 | |
114 | // Second frame is greeting signature |
115 | recv_array_expect_success (stream, greeting.signature, 0); |
116 | |
117 | // Send our own protocol greeting |
118 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE)); |
119 | TEST_ASSERT_EQUAL_INT ( |
120 | sizeof (greeting), TEST_ASSERT_SUCCESS_ERRNO ( |
121 | zmq_send (stream, &greeting, sizeof (greeting), 0))); |
122 | |
123 | // Now we expect the data from the DEALER socket |
124 | // We want the rest of greeting along with the Ready command |
125 | int bytes_read = 0; |
126 | while (bytes_read < 97) { |
127 | // First frame is the routing id of the connection (each time) |
128 | TEST_ASSERT_GREATER_THAN_INT ( |
129 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0))); |
130 | TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); |
131 | // Second frame contains the next chunk of data |
132 | TEST_ASSERT_SUCCESS_ERRNO ( |
133 | rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0)); |
134 | bytes_read += rc; |
135 | } |
136 | |
137 | // First two bytes are major and minor version numbers. |
138 | TEST_ASSERT_EQUAL_INT (3, buffer[0]); // ZMTP/3.0 |
139 | TEST_ASSERT_EQUAL_INT (0, buffer[1]); |
140 | |
141 | // Mechanism is "NULL" |
142 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2, |
143 | "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" , 20); |
144 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 54, "\4\51\5READY" , 8); |
145 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 62, "\13Socket-Type\0\0\0\6DEALER" , |
146 | 22); |
147 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 84, "\10Identity\0\0\0\0" , 13); |
148 | |
149 | // Announce we are ready |
150 | memcpy (buffer, "\4\51\5READY" , 8); |
151 | memcpy (buffer + 8, "\13Socket-Type\0\0\0\6ROUTER" , 22); |
152 | memcpy (buffer + 30, "\10Identity\0\0\0\0" , 13); |
153 | |
154 | // Send Ready command |
155 | TEST_ASSERT_GREATER_THAN_INT (0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send ( |
156 | &routing_id, stream, ZMQ_SNDMORE))); |
157 | TEST_ASSERT_EQUAL_INT ( |
158 | 43, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, buffer, 43, 0))); |
159 | |
160 | // Now we expect the data from the DEALER socket |
161 | // First frame is, again, the routing id of the connection |
162 | TEST_ASSERT_GREATER_THAN_INT ( |
163 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0))); |
164 | TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); |
165 | |
166 | // Third frame contains Hello message from DEALER |
167 | TEST_ASSERT_EQUAL_INT (7, TEST_ASSERT_SUCCESS_ERRNO ( |
168 | zmq_recv (stream, buffer, sizeof buffer, 0))); |
169 | |
170 | // Then we have a 5-byte message "Hello" |
171 | TEST_ASSERT_EQUAL_INT (0, buffer[0]); // Flags = 0 |
172 | TEST_ASSERT_EQUAL_INT (5, buffer[1]); // Size = 5 |
173 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2, "Hello" , 5); |
174 | |
175 | // Send "World" back to DEALER |
176 | TEST_ASSERT_GREATER_THAN_INT (0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send ( |
177 | &routing_id, stream, ZMQ_SNDMORE))); |
178 | byte world[] = {0, 5, 'W', 'o', 'r', 'l', 'd'}; |
179 | TEST_ASSERT_EQUAL_INT ( |
180 | sizeof (world), |
181 | TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, world, sizeof (world), 0))); |
182 | |
183 | // Expect response on DEALER socket |
184 | recv_string_expect_success (dealer, "World" , 0); |
185 | |
186 | // Test large messages over STREAM socket |
187 | #define size 64000 |
188 | uint8_t msgout[size]; |
189 | memset (msgout, 0xAB, size); |
190 | zmq_send (dealer, msgout, size, 0); |
191 | |
192 | uint8_t msgin[9 + size]; |
193 | memset (msgin, 0, 9 + size); |
194 | bytes_read = 0; |
195 | while (bytes_read < 9 + size) { |
196 | // Get routing id frame |
197 | TEST_ASSERT_GREATER_THAN_INT ( |
198 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (stream, buffer, 256, 0))); |
199 | // Get next chunk |
200 | TEST_ASSERT_GREATER_THAN_INT ( |
201 | 0, |
202 | TEST_ASSERT_SUCCESS_ERRNO (rc = zmq_recv (stream, msgin + bytes_read, |
203 | 9 + size - bytes_read, 0))); |
204 | bytes_read += rc; |
205 | } |
206 | for (int byte_nbr = 0; byte_nbr < size; byte_nbr++) { |
207 | TEST_ASSERT_EQUAL_UINT8 (0xAB, msgin[9 + byte_nbr]); |
208 | } |
209 | test_context_socket_close (dealer); |
210 | test_context_socket_close (stream); |
211 | } |
212 | |
213 | |
214 | static void test_stream_to_stream () |
215 | { |
216 | char my_endpoint[MAX_SOCKET_STRING]; |
217 | // Set-up our context and sockets |
218 | |
219 | void *server = test_context_socket (ZMQ_STREAM); |
220 | int enabled = 1; |
221 | TEST_ASSERT_SUCCESS_ERRNO ( |
222 | zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled))); |
223 | bind_loopback_ipv4 (server, my_endpoint, sizeof my_endpoint); |
224 | |
225 | void *client = test_context_socket (ZMQ_STREAM); |
226 | TEST_ASSERT_SUCCESS_ERRNO ( |
227 | zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled))); |
228 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint)); |
229 | uint8_t id[256]; |
230 | uint8_t buffer[256]; |
231 | |
232 | // Connecting sends a zero message |
233 | // Server: First frame is routing id, second frame is zero |
234 | TEST_ASSERT_GREATER_THAN_INT ( |
235 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0))); |
236 | TEST_ASSERT_EQUAL_INT ( |
237 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, buffer, 256, 0))); |
238 | // Client: First frame is routing id, second frame is zero |
239 | TEST_ASSERT_GREATER_THAN_INT ( |
240 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0))); |
241 | TEST_ASSERT_EQUAL_INT ( |
242 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, buffer, 256, 0))); |
243 | |
244 | // Sent HTTP request on client socket |
245 | // Get server routing id |
246 | size_t id_size = sizeof id; |
247 | TEST_ASSERT_SUCCESS_ERRNO ( |
248 | zmq_getsockopt (client, ZMQ_ROUTING_ID, id, &id_size)); |
249 | // First frame is server routing id |
250 | TEST_ASSERT_EQUAL_INT ((int) id_size, TEST_ASSERT_SUCCESS_ERRNO (zmq_send ( |
251 | client, id, id_size, ZMQ_SNDMORE))); |
252 | // Second frame is HTTP GET request |
253 | TEST_ASSERT_EQUAL_INT ( |
254 | 7, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (client, "GET /\n\n" , 7, 0))); |
255 | |
256 | // Get HTTP request; ID frame and then request |
257 | TEST_ASSERT_GREATER_THAN_INT ( |
258 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0))); |
259 | TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, buffer, 256, 0)); |
260 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, "GET /\n\n" , 7); |
261 | |
262 | // Send reply back to client |
263 | char http_response[] = "HTTP/1.0 200 OK\r\n" |
264 | "Content-Type: text/plain\r\n" |
265 | "\r\n" |
266 | "Hello, World!" ; |
267 | TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, id, id_size, ZMQ_SNDMORE)); |
268 | TEST_ASSERT_SUCCESS_ERRNO ( |
269 | zmq_send (server, http_response, sizeof (http_response), ZMQ_SNDMORE)); |
270 | |
271 | // Send zero to close connection to client |
272 | TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, id, id_size, ZMQ_SNDMORE)); |
273 | TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, NULL, 0, ZMQ_SNDMORE)); |
274 | |
275 | // Get reply at client and check that it's complete |
276 | TEST_ASSERT_GREATER_THAN_INT ( |
277 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0))); |
278 | TEST_ASSERT_EQUAL_INT ( |
279 | sizeof http_response, |
280 | TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, buffer, 256, 0))); |
281 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, http_response, |
282 | sizeof (http_response)); |
283 | |
284 | // // Get disconnection notification |
285 | // FIXME: why does this block? Bug in STREAM disconnect notification? |
286 | // id_size = zmq_recv (client, id, 256, 0); |
287 | // assert (id_size > 0); |
288 | // rc = zmq_recv (client, buffer, 256, 0); |
289 | // assert (rc == 0); |
290 | |
291 | test_context_socket_close (server); |
292 | test_context_socket_close (client); |
293 | } |
294 | |
295 | int main () |
296 | { |
297 | setup_test_environment (); |
298 | |
299 | UNITY_BEGIN (); |
300 | RUN_TEST (test_stream_to_dealer); |
301 | RUN_TEST (test_stream_to_stream); |
302 | return UNITY_END (); |
303 | } |
304 | |