| 1 | /* |
| 2 | Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file |
| 3 | |
| 4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
| 5 | |
| 6 | libzmq is free software; you can redistribute it and/or modify it under |
| 7 | the terms of the GNU Lesser General Public License (LGPL) as published |
| 8 | by the Free Software Foundation; either version 3 of the License, or |
| 9 | (at your option) any later version. |
| 10 | |
| 11 | As a special exception, the Contributors give you permission to link |
| 12 | this library with independent modules to produce an executable, |
| 13 | regardless of the license terms of these independent modules, and to |
| 14 | copy and distribute the resulting executable under terms of your choice, |
| 15 | provided that you also meet, for each linked independent module, the |
| 16 | terms and conditions of the license of that module. An independent |
| 17 | module is a module which is not derived from or based on this library. |
| 18 | If you modify this library, you must extend this exception to your |
| 19 | version of the library. |
| 20 | |
| 21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
| 22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
| 23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
| 24 | License for more details. |
| 25 | |
| 26 | You should have received a copy of the GNU Lesser General Public License |
| 27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
| 28 | */ |
| 29 | |
| 30 | #include "testutil.hpp" |
| 31 | #include "testutil_unity.hpp" |
| 32 | |
| 33 | #include <string.h> |
| 34 | |
| 35 | SETUP_TEARDOWN_TESTCONTEXT |
| 36 | |
| 37 | // ZMTP protocol greeting structure |
| 38 | |
| 39 | typedef uint8_t byte; |
| 40 | typedef struct |
| 41 | { |
| 42 | byte signature[10]; // 0xFF 8*0x00 0x7F |
| 43 | byte version[2]; // 0x03 0x00 for ZMTP/3.0 |
| 44 | byte mechanism[20]; // "NULL" |
| 45 | byte as_server; |
| 46 | byte filler[31]; |
| 47 | } zmtp_greeting_t; |
| 48 | |
| 49 | #define ZMTP_DEALER 5 // Socket type constants |
| 50 | |
| 51 | // This is a greeting matching what 0MQ will send us; note the |
| 52 | // 8-byte size is set to 1 for backwards compatibility |
| 53 | |
| 54 | static zmtp_greeting_t greeting = { |
| 55 | {0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F}, {3, 0}, {'N', 'U', 'L', 'L'}, 0, {0}}; |
| 56 | |
| 57 | static void test_stream_to_dealer () |
| 58 | { |
| 59 | int rc; |
| 60 | char my_endpoint[MAX_SOCKET_STRING]; |
| 61 | |
| 62 | // We'll be using this socket in raw mode |
| 63 | void *stream = test_context_socket (ZMQ_STREAM); |
| 64 | |
| 65 | int zero = 0; |
| 66 | TEST_ASSERT_SUCCESS_ERRNO ( |
| 67 | zmq_setsockopt (stream, ZMQ_LINGER, &zero, sizeof (zero))); |
| 68 | int enabled = 1; |
| 69 | TEST_ASSERT_SUCCESS_ERRNO ( |
| 70 | zmq_setsockopt (stream, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled))); |
| 71 | bind_loopback_ipv4 (stream, my_endpoint, sizeof my_endpoint); |
| 72 | |
| 73 | // We'll be using this socket as the other peer |
| 74 | void *dealer = test_context_socket (ZMQ_DEALER); |
| 75 | TEST_ASSERT_SUCCESS_ERRNO ( |
| 76 | zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero))); |
| 77 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, my_endpoint)); |
| 78 | |
| 79 | // Send a message on the dealer socket |
| 80 | send_string_expect_success (dealer, "Hello" , 0); |
| 81 | |
| 82 | // Connecting sends a zero message |
| 83 | // First frame is routing id |
| 84 | zmq_msg_t routing_id; |
| 85 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&routing_id)); |
| 86 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)); |
| 87 | TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); |
| 88 | |
| 89 | // Verify the existence of Peer-Address metadata |
| 90 | char const *peer_address = zmq_msg_gets (&routing_id, "Peer-Address" ); |
| 91 | TEST_ASSERT_NOT_NULL (peer_address); |
| 92 | TEST_ASSERT_EQUAL_STRING ("127.0.0.1" , peer_address); |
| 93 | |
| 94 | // Second frame is zero |
| 95 | byte buffer[255]; |
| 96 | TEST_ASSERT_EQUAL_INT ( |
| 97 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (stream, buffer, 255, 0))); |
| 98 | |
| 99 | // Verify the existence of Peer-Address metadata |
| 100 | peer_address = zmq_msg_gets (&routing_id, "Peer-Address" ); |
| 101 | TEST_ASSERT_NOT_NULL (peer_address); |
| 102 | TEST_ASSERT_EQUAL_STRING ("127.0.0.1" , peer_address); |
| 103 | |
| 104 | // Real data follows |
| 105 | // First frame is routing id |
| 106 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0)); |
| 107 | TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); |
| 108 | |
| 109 | // Verify the existence of Peer-Address metadata |
| 110 | peer_address = zmq_msg_gets (&routing_id, "Peer-Address" ); |
| 111 | TEST_ASSERT_NOT_NULL (peer_address); |
| 112 | TEST_ASSERT_EQUAL_STRING ("127.0.0.1" , peer_address); |
| 113 | |
| 114 | // Second frame is greeting signature |
| 115 | recv_array_expect_success (stream, greeting.signature, 0); |
| 116 | |
| 117 | // Send our own protocol greeting |
| 118 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send (&routing_id, stream, ZMQ_SNDMORE)); |
| 119 | TEST_ASSERT_EQUAL_INT ( |
| 120 | sizeof (greeting), TEST_ASSERT_SUCCESS_ERRNO ( |
| 121 | zmq_send (stream, &greeting, sizeof (greeting), 0))); |
| 122 | |
| 123 | // Now we expect the data from the DEALER socket |
| 124 | // We want the rest of greeting along with the Ready command |
| 125 | int bytes_read = 0; |
| 126 | while (bytes_read < 97) { |
| 127 | // First frame is the routing id of the connection (each time) |
| 128 | TEST_ASSERT_GREATER_THAN_INT ( |
| 129 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0))); |
| 130 | TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); |
| 131 | // Second frame contains the next chunk of data |
| 132 | TEST_ASSERT_SUCCESS_ERRNO ( |
| 133 | rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0)); |
| 134 | bytes_read += rc; |
| 135 | } |
| 136 | |
| 137 | // First two bytes are major and minor version numbers. |
| 138 | TEST_ASSERT_EQUAL_INT (3, buffer[0]); // ZMTP/3.0 |
| 139 | TEST_ASSERT_EQUAL_INT (0, buffer[1]); |
| 140 | |
| 141 | // Mechanism is "NULL" |
| 142 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2, |
| 143 | "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0" , 20); |
| 144 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 54, "\4\51\5READY" , 8); |
| 145 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 62, "\13Socket-Type\0\0\0\6DEALER" , |
| 146 | 22); |
| 147 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 84, "\10Identity\0\0\0\0" , 13); |
| 148 | |
| 149 | // Announce we are ready |
| 150 | memcpy (buffer, "\4\51\5READY" , 8); |
| 151 | memcpy (buffer + 8, "\13Socket-Type\0\0\0\6ROUTER" , 22); |
| 152 | memcpy (buffer + 30, "\10Identity\0\0\0\0" , 13); |
| 153 | |
| 154 | // Send Ready command |
| 155 | TEST_ASSERT_GREATER_THAN_INT (0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send ( |
| 156 | &routing_id, stream, ZMQ_SNDMORE))); |
| 157 | TEST_ASSERT_EQUAL_INT ( |
| 158 | 43, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, buffer, 43, 0))); |
| 159 | |
| 160 | // Now we expect the data from the DEALER socket |
| 161 | // First frame is, again, the routing id of the connection |
| 162 | TEST_ASSERT_GREATER_THAN_INT ( |
| 163 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&routing_id, stream, 0))); |
| 164 | TEST_ASSERT_TRUE (zmq_msg_more (&routing_id)); |
| 165 | |
| 166 | // Third frame contains Hello message from DEALER |
| 167 | TEST_ASSERT_EQUAL_INT (7, TEST_ASSERT_SUCCESS_ERRNO ( |
| 168 | zmq_recv (stream, buffer, sizeof buffer, 0))); |
| 169 | |
| 170 | // Then we have a 5-byte message "Hello" |
| 171 | TEST_ASSERT_EQUAL_INT (0, buffer[0]); // Flags = 0 |
| 172 | TEST_ASSERT_EQUAL_INT (5, buffer[1]); // Size = 5 |
| 173 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2, "Hello" , 5); |
| 174 | |
| 175 | // Send "World" back to DEALER |
| 176 | TEST_ASSERT_GREATER_THAN_INT (0, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_send ( |
| 177 | &routing_id, stream, ZMQ_SNDMORE))); |
| 178 | byte world[] = {0, 5, 'W', 'o', 'r', 'l', 'd'}; |
| 179 | TEST_ASSERT_EQUAL_INT ( |
| 180 | sizeof (world), |
| 181 | TEST_ASSERT_SUCCESS_ERRNO (zmq_send (stream, world, sizeof (world), 0))); |
| 182 | |
| 183 | // Expect response on DEALER socket |
| 184 | recv_string_expect_success (dealer, "World" , 0); |
| 185 | |
| 186 | // Test large messages over STREAM socket |
| 187 | #define size 64000 |
| 188 | uint8_t msgout[size]; |
| 189 | memset (msgout, 0xAB, size); |
| 190 | zmq_send (dealer, msgout, size, 0); |
| 191 | |
| 192 | uint8_t msgin[9 + size]; |
| 193 | memset (msgin, 0, 9 + size); |
| 194 | bytes_read = 0; |
| 195 | while (bytes_read < 9 + size) { |
| 196 | // Get routing id frame |
| 197 | TEST_ASSERT_GREATER_THAN_INT ( |
| 198 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (stream, buffer, 256, 0))); |
| 199 | // Get next chunk |
| 200 | TEST_ASSERT_GREATER_THAN_INT ( |
| 201 | 0, |
| 202 | TEST_ASSERT_SUCCESS_ERRNO (rc = zmq_recv (stream, msgin + bytes_read, |
| 203 | 9 + size - bytes_read, 0))); |
| 204 | bytes_read += rc; |
| 205 | } |
| 206 | for (int byte_nbr = 0; byte_nbr < size; byte_nbr++) { |
| 207 | TEST_ASSERT_EQUAL_UINT8 (0xAB, msgin[9 + byte_nbr]); |
| 208 | } |
| 209 | test_context_socket_close (dealer); |
| 210 | test_context_socket_close (stream); |
| 211 | } |
| 212 | |
| 213 | |
| 214 | static void test_stream_to_stream () |
| 215 | { |
| 216 | char my_endpoint[MAX_SOCKET_STRING]; |
| 217 | // Set-up our context and sockets |
| 218 | |
| 219 | void *server = test_context_socket (ZMQ_STREAM); |
| 220 | int enabled = 1; |
| 221 | TEST_ASSERT_SUCCESS_ERRNO ( |
| 222 | zmq_setsockopt (server, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled))); |
| 223 | bind_loopback_ipv4 (server, my_endpoint, sizeof my_endpoint); |
| 224 | |
| 225 | void *client = test_context_socket (ZMQ_STREAM); |
| 226 | TEST_ASSERT_SUCCESS_ERRNO ( |
| 227 | zmq_setsockopt (client, ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled))); |
| 228 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint)); |
| 229 | uint8_t id[256]; |
| 230 | uint8_t buffer[256]; |
| 231 | |
| 232 | // Connecting sends a zero message |
| 233 | // Server: First frame is routing id, second frame is zero |
| 234 | TEST_ASSERT_GREATER_THAN_INT ( |
| 235 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0))); |
| 236 | TEST_ASSERT_EQUAL_INT ( |
| 237 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, buffer, 256, 0))); |
| 238 | // Client: First frame is routing id, second frame is zero |
| 239 | TEST_ASSERT_GREATER_THAN_INT ( |
| 240 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0))); |
| 241 | TEST_ASSERT_EQUAL_INT ( |
| 242 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, buffer, 256, 0))); |
| 243 | |
| 244 | // Sent HTTP request on client socket |
| 245 | // Get server routing id |
| 246 | size_t id_size = sizeof id; |
| 247 | TEST_ASSERT_SUCCESS_ERRNO ( |
| 248 | zmq_getsockopt (client, ZMQ_ROUTING_ID, id, &id_size)); |
| 249 | // First frame is server routing id |
| 250 | TEST_ASSERT_EQUAL_INT ((int) id_size, TEST_ASSERT_SUCCESS_ERRNO (zmq_send ( |
| 251 | client, id, id_size, ZMQ_SNDMORE))); |
| 252 | // Second frame is HTTP GET request |
| 253 | TEST_ASSERT_EQUAL_INT ( |
| 254 | 7, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (client, "GET /\n\n" , 7, 0))); |
| 255 | |
| 256 | // Get HTTP request; ID frame and then request |
| 257 | TEST_ASSERT_GREATER_THAN_INT ( |
| 258 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, id, 256, 0))); |
| 259 | TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (server, buffer, 256, 0)); |
| 260 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, "GET /\n\n" , 7); |
| 261 | |
| 262 | // Send reply back to client |
| 263 | char http_response[] = "HTTP/1.0 200 OK\r\n" |
| 264 | "Content-Type: text/plain\r\n" |
| 265 | "\r\n" |
| 266 | "Hello, World!" ; |
| 267 | TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, id, id_size, ZMQ_SNDMORE)); |
| 268 | TEST_ASSERT_SUCCESS_ERRNO ( |
| 269 | zmq_send (server, http_response, sizeof (http_response), ZMQ_SNDMORE)); |
| 270 | |
| 271 | // Send zero to close connection to client |
| 272 | TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, id, id_size, ZMQ_SNDMORE)); |
| 273 | TEST_ASSERT_SUCCESS_ERRNO (zmq_send (server, NULL, 0, ZMQ_SNDMORE)); |
| 274 | |
| 275 | // Get reply at client and check that it's complete |
| 276 | TEST_ASSERT_GREATER_THAN_INT ( |
| 277 | 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, id, 256, 0))); |
| 278 | TEST_ASSERT_EQUAL_INT ( |
| 279 | sizeof http_response, |
| 280 | TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (client, buffer, 256, 0))); |
| 281 | TEST_ASSERT_EQUAL_INT8_ARRAY (buffer, http_response, |
| 282 | sizeof (http_response)); |
| 283 | |
| 284 | // // Get disconnection notification |
| 285 | // FIXME: why does this block? Bug in STREAM disconnect notification? |
| 286 | // id_size = zmq_recv (client, id, 256, 0); |
| 287 | // assert (id_size > 0); |
| 288 | // rc = zmq_recv (client, buffer, 256, 0); |
| 289 | // assert (rc == 0); |
| 290 | |
| 291 | test_context_socket_close (server); |
| 292 | test_context_socket_close (client); |
| 293 | } |
| 294 | |
| 295 | int main () |
| 296 | { |
| 297 | setup_test_environment (); |
| 298 | |
| 299 | UNITY_BEGIN (); |
| 300 | RUN_TEST (test_stream_to_dealer); |
| 301 | RUN_TEST (test_stream_to_stream); |
| 302 | return UNITY_END (); |
| 303 | } |
| 304 | |