1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33#include <string.h>
34
35SETUP_TEARDOWN_TESTCONTEXT
36
37static const int SERVER = 0;
38static const int CLIENT = 1;
39
40struct test_message_t
41{
42 int turn;
43 const char *text;
44};
45
46// NOTE: messages are sent without null terminator.
47const test_message_t dialog[] = {
48 {CLIENT, "i can haz cheez burger?"},
49 {SERVER, "y u no disonnect?"},
50 {CLIENT, ""},
51};
52const int steps = sizeof (dialog) / sizeof (dialog[0]);
53
54bool has_more (void *socket_)
55{
56 int more = 0;
57 size_t more_size = sizeof (more);
58 int rc = zmq_getsockopt (socket_, ZMQ_RCVMORE, &more, &more_size);
59 if (rc != 0)
60 return false;
61 return more != 0;
62}
63
64void test_stream_disconnect ()
65{
66 size_t len = MAX_SOCKET_STRING;
67 char bind_endpoint[MAX_SOCKET_STRING];
68 char connect_endpoint[MAX_SOCKET_STRING];
69 void *sockets[2];
70
71 sockets[SERVER] = test_context_socket (ZMQ_STREAM);
72 int enabled = 1;
73 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
74 sockets[SERVER], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
75 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sockets[SERVER], "tcp://0.0.0.0:*"));
76 TEST_ASSERT_SUCCESS_ERRNO (
77 zmq_getsockopt (sockets[SERVER], ZMQ_LAST_ENDPOINT, bind_endpoint, &len));
78
79 // Apparently Windows can't connect to 0.0.0.0. A better fix would be welcome.
80#ifdef ZMQ_HAVE_WINDOWS
81 sprintf (connect_endpoint, "tcp://127.0.0.1:%s",
82 strrchr (bind_endpoint, ':') + 1);
83#else
84 strcpy (connect_endpoint, bind_endpoint);
85#endif
86
87 sockets[CLIENT] = test_context_socket (ZMQ_STREAM);
88 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
89 sockets[CLIENT], ZMQ_STREAM_NOTIFY, &enabled, sizeof (enabled)));
90 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sockets[CLIENT], connect_endpoint));
91
92 // wait for connect notification
93 // Server: Grab the 1st frame (peer routing id).
94 zmq_msg_t peer_frame;
95 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
96 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&peer_frame, sockets[SERVER], 0));
97 TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
98 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
99 TEST_ASSERT_TRUE (has_more (sockets[SERVER]));
100
101 // Server: Grab the 2nd frame (actual payload).
102 zmq_msg_t data_frame;
103 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
104 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&data_frame, sockets[SERVER], 0));
105 TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&data_frame));
106 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
107
108 // Client: Grab the 1st frame (peer routing id).
109 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
110 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&peer_frame, sockets[CLIENT], 0));
111 TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
112 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
113 TEST_ASSERT_TRUE (has_more (sockets[CLIENT]));
114
115 // Client: Grab the 2nd frame (actual payload).
116 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
117 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&data_frame, sockets[CLIENT], 0));
118 TEST_ASSERT_EQUAL_INT (0, zmq_msg_size (&data_frame));
119 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
120
121 // Send initial message.
122 char blob_data[256];
123 size_t blob_size = sizeof (blob_data);
124 TEST_ASSERT_SUCCESS_ERRNO (
125 zmq_getsockopt (sockets[CLIENT], ZMQ_ROUTING_ID, blob_data, &blob_size));
126 TEST_ASSERT_GREATER_THAN (0, blob_size);
127 zmq_msg_t msg;
128 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, blob_size));
129 memcpy (zmq_msg_data (&msg), blob_data, blob_size);
130 TEST_ASSERT_SUCCESS_ERRNO (
131 zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE));
132 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
133 TEST_ASSERT_SUCCESS_ERRNO (
134 zmq_msg_init_size (&msg, strlen (dialog[0].text)));
135 memcpy (zmq_msg_data (&msg), dialog[0].text, strlen (dialog[0].text));
136 TEST_ASSERT_SUCCESS_ERRNO (
137 zmq_msg_send (&msg, sockets[dialog[0].turn], ZMQ_SNDMORE));
138 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
139
140 // TODO: make sure this loop doesn't loop forever if something is wrong
141 // with the test (or the implementation).
142
143 int step = 0;
144 while (step < steps) {
145 // Wait until something happens.
146 zmq_pollitem_t items[] = {
147 {sockets[SERVER], 0, ZMQ_POLLIN, 0},
148 {sockets[CLIENT], 0, ZMQ_POLLIN, 0},
149 };
150 TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (items, 2, 100));
151
152 // Check for data received by the server.
153 if (items[SERVER].revents & ZMQ_POLLIN) {
154 TEST_ASSERT_EQUAL_INT (CLIENT, dialog[step].turn);
155
156 // Grab the 1st frame (peer routing id).
157 zmq_msg_t peer_frame;
158 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
159 TEST_ASSERT_SUCCESS_ERRNO (
160 zmq_msg_recv (&peer_frame, sockets[SERVER], 0));
161 TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
162 TEST_ASSERT_TRUE (has_more (sockets[SERVER]));
163
164 // Grab the 2nd frame (actual payload).
165 zmq_msg_t data_frame;
166 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
167 TEST_ASSERT_SUCCESS_ERRNO (
168 zmq_msg_recv (&data_frame, sockets[SERVER], 0));
169
170 // Make sure payload matches what we expect.
171 const char *const data =
172 static_cast<const char *> (zmq_msg_data (&data_frame));
173 const size_t size = zmq_msg_size (&data_frame);
174 // 0-length frame is a disconnection notification. The server
175 // should receive it as the last step in the dialogue.
176 if (size == 0) {
177 ++step;
178 TEST_ASSERT_EQUAL_INT (steps, step);
179 } else {
180 TEST_ASSERT_EQUAL_INT (strlen (dialog[step].text), size);
181 TEST_ASSERT_EQUAL_STRING_LEN (dialog[step].text, data, size);
182
183 ++step;
184
185 TEST_ASSERT_LESS_THAN_INT (steps, step);
186
187 // Prepare the response.
188 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
189 TEST_ASSERT_SUCCESS_ERRNO (
190 zmq_msg_init_size (&data_frame, strlen (dialog[step].text)));
191 memcpy (zmq_msg_data (&data_frame), dialog[step].text,
192 zmq_msg_size (&data_frame));
193
194 // Send the response.
195 TEST_ASSERT_SUCCESS_ERRNO (
196 zmq_msg_send (&peer_frame, sockets[SERVER], ZMQ_SNDMORE));
197 TEST_ASSERT_SUCCESS_ERRNO (
198 zmq_msg_send (&data_frame, sockets[SERVER], ZMQ_SNDMORE));
199 }
200
201 // Release resources.
202 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
203 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
204 }
205
206 // Check for data received by the client.
207 if (items[CLIENT].revents & ZMQ_POLLIN) {
208 TEST_ASSERT_EQUAL_INT (SERVER, dialog[step].turn);
209
210 // Grab the 1st frame (peer routing id).
211 zmq_msg_t peer_frame;
212 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&peer_frame));
213 TEST_ASSERT_SUCCESS_ERRNO (
214 zmq_msg_recv (&peer_frame, sockets[CLIENT], 0));
215 TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&peer_frame));
216 TEST_ASSERT_TRUE (has_more (sockets[CLIENT]));
217
218 // Grab the 2nd frame (actual payload).
219 zmq_msg_t data_frame;
220 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&data_frame));
221 TEST_ASSERT_SUCCESS_ERRNO (
222 zmq_msg_recv (&data_frame, sockets[CLIENT], 0));
223 TEST_ASSERT_GREATER_THAN_INT (0, zmq_msg_size (&data_frame));
224
225 // Make sure payload matches what we expect.
226 const char *const data =
227 static_cast<const char *> (zmq_msg_data (&data_frame));
228 const size_t size = zmq_msg_size (&data_frame);
229 TEST_ASSERT_EQUAL_INT (strlen (dialog[step].text), size);
230 TEST_ASSERT_EQUAL_STRING_LEN (dialog[step].text, data, size);
231
232 ++step;
233
234 // Prepare the response (next line in the dialog).
235 TEST_ASSERT_LESS_THAN_INT (steps, step);
236 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
237 TEST_ASSERT_SUCCESS_ERRNO (
238 zmq_msg_init_size (&data_frame, strlen (dialog[step].text)));
239 memcpy (zmq_msg_data (&data_frame), dialog[step].text,
240 zmq_msg_size (&data_frame));
241
242 // Send the response.
243 TEST_ASSERT_SUCCESS_ERRNO (
244 zmq_msg_send (&peer_frame, sockets[CLIENT], ZMQ_SNDMORE));
245 TEST_ASSERT_SUCCESS_ERRNO (
246 zmq_msg_send (&data_frame, sockets[CLIENT], ZMQ_SNDMORE));
247
248 // Release resources.
249 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&peer_frame));
250 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&data_frame));
251 }
252 }
253 TEST_ASSERT_EQUAL_INT (steps, step);
254 test_context_socket_close (sockets[CLIENT]);
255 test_context_socket_close (sockets[SERVER]);
256}
257
258int main (int, char **)
259{
260 setup_test_environment ();
261
262 UNITY_BEGIN ();
263 RUN_TEST (test_stream_disconnect);
264 return UNITY_END ();
265}
266