1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33#include <string.h>
34
35SETUP_TEARDOWN_TESTCONTEXT
36
37const char *rconn1routing_id = "conn1";
38const char *x_routing_id = "X";
39const char *y_routing_id = "Y";
40const char *z_routing_id = "Z";
41
42void test_stream_2_stream ()
43{
44 char buff[256];
45 const char msg[] = "hi 1";
46 const int disabled = 0;
47 const int zero = 0;
48 char my_endpoint[MAX_SOCKET_STRING];
49
50 // Set up listener STREAM.
51 void *rbind = test_context_socket (ZMQ_STREAM);
52 TEST_ASSERT_SUCCESS_ERRNO (
53 zmq_setsockopt (rbind, ZMQ_STREAM_NOTIFY, &disabled, sizeof (disabled)));
54
55 TEST_ASSERT_SUCCESS_ERRNO (
56 zmq_setsockopt (rbind, ZMQ_LINGER, &zero, sizeof zero));
57 bind_loopback_ipv4 (rbind, my_endpoint, sizeof my_endpoint);
58
59 // Set up connection stream.
60 void *rconn1 = test_context_socket (ZMQ_STREAM);
61 TEST_ASSERT_SUCCESS_ERRNO (
62 zmq_setsockopt (rconn1, ZMQ_LINGER, &zero, sizeof zero));
63
64 // Do the connection.
65 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID,
66 rconn1routing_id,
67 strlen (rconn1routing_id)));
68 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rconn1, my_endpoint));
69
70 /* Uncomment to test assert on duplicate routing id.
71 // Test duplicate connect attempt.
72 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, rconn1routing_id, strlen(rconn1routing_id)));
73 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rconn1, bindip));
74*/
75 // Send data to the bound stream.
76 send_string_expect_success (rconn1, rconn1routing_id, ZMQ_SNDMORE);
77 send_string_expect_success (rconn1, msg, 0);
78
79 // Accept data on the bound stream.
80 TEST_ASSERT_GREATER_THAN (
81 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (rbind, buff, 256, 0)));
82 TEST_ASSERT_EQUAL (0, buff[0]); // an auto-generated routing id
83 recv_string_expect_success (rbind, msg, 0);
84
85 // Handle close of the socket.
86 TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (rbind, my_endpoint));
87 test_context_socket_close (rbind);
88 test_context_socket_close (rconn1);
89}
90
91void test_router_2_router (bool named_)
92{
93 char buff[256];
94 const char msg[] = "hi 1";
95 const int zero = 0;
96 char my_endpoint[MAX_SOCKET_STRING];
97
98 // Create bind socket.
99 void *rbind = test_context_socket (ZMQ_ROUTER);
100 TEST_ASSERT_SUCCESS_ERRNO (
101 zmq_setsockopt (rbind, ZMQ_LINGER, &zero, sizeof (zero)));
102 bind_loopback_ipv4 (rbind, my_endpoint, sizeof my_endpoint);
103
104 // Create connection socket.
105 void *rconn1 = test_context_socket (ZMQ_ROUTER);
106 TEST_ASSERT_SUCCESS_ERRNO (
107 zmq_setsockopt (rconn1, ZMQ_LINGER, &zero, sizeof (zero)));
108
109 // If we're in named mode, set some identities.
110 if (named_) {
111 TEST_ASSERT_SUCCESS_ERRNO (
112 zmq_setsockopt (rbind, ZMQ_ROUTING_ID, x_routing_id, 1));
113 TEST_ASSERT_SUCCESS_ERRNO (
114 zmq_setsockopt (rconn1, ZMQ_ROUTING_ID, y_routing_id, 1));
115 }
116
117 // Make call to connect using a connect_routing_id.
118 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID,
119 rconn1routing_id,
120 strlen (rconn1routing_id)));
121 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rconn1, my_endpoint));
122 /* Uncomment to test assert on duplicate routing id
123 // Test duplicate connect attempt.
124 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (rconn1, ZMQ_CONNECT_ROUTING_ID, rconn1routing_id, strlen (rconn1routing_id)));
125 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rconn1, bindip));
126*/
127 // Send some data.
128
129 send_string_expect_success (rconn1, rconn1routing_id, ZMQ_SNDMORE);
130 send_string_expect_success (rconn1, msg, 0);
131
132 // Receive the name.
133 const int routing_id_len = zmq_recv (rbind, buff, 256, 0);
134 if (named_) {
135 TEST_ASSERT_EQUAL_INT (strlen (y_routing_id), routing_id_len);
136 TEST_ASSERT_EQUAL_STRING_LEN (y_routing_id, buff, routing_id_len);
137 } else {
138 TEST_ASSERT_TRUE (routing_id_len && 0 == buff[0]);
139 }
140
141 // Receive the data.
142 recv_string_expect_success (rbind, msg, 0);
143
144 // Send some data back.
145 const int ret = zmq_send (rbind, buff, routing_id_len, ZMQ_SNDMORE);
146 TEST_ASSERT_EQUAL_INT (routing_id_len, ret);
147 send_string_expect_success (rbind, "ok", 0);
148
149 // If bound socket identity naming a problem, we'll likely see something funky here.
150 recv_string_expect_success (rconn1, rconn1routing_id, 0);
151 recv_string_expect_success (rconn1, "ok", 0);
152
153 TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (rbind, my_endpoint));
154 test_context_socket_close (rbind);
155 test_context_socket_close (rconn1);
156}
157
158void test_router_2_router_while_receiving ()
159{
160 char buff[256];
161 const char msg[] = "hi 1";
162 const int zero = 0;
163 char x_endpoint[MAX_SOCKET_STRING];
164 char z_endpoint[MAX_SOCKET_STRING];
165
166 // Create xbind socket.
167 void *xbind = test_context_socket (ZMQ_ROUTER);
168 TEST_ASSERT_SUCCESS_ERRNO (
169 zmq_setsockopt (xbind, ZMQ_LINGER, &zero, sizeof (zero)));
170 bind_loopback_ipv4 (xbind, x_endpoint, sizeof x_endpoint);
171
172 // Create zbind socket.
173 void *zbind = test_context_socket (ZMQ_ROUTER);
174 TEST_ASSERT_SUCCESS_ERRNO (
175 zmq_setsockopt (zbind, ZMQ_LINGER, &zero, sizeof (zero)));
176 bind_loopback_ipv4 (zbind, z_endpoint, sizeof z_endpoint);
177
178 // Create connection socket.
179 void *yconn = test_context_socket (ZMQ_ROUTER);
180 TEST_ASSERT_SUCCESS_ERRNO (
181 zmq_setsockopt (yconn, ZMQ_LINGER, &zero, sizeof (zero)));
182
183 // set identities for each socket
184 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
185 xbind, ZMQ_ROUTING_ID, x_routing_id, strlen (x_routing_id)));
186 TEST_ASSERT_SUCCESS_ERRNO (
187 zmq_setsockopt (yconn, ZMQ_ROUTING_ID, y_routing_id, 2));
188 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
189 zbind, ZMQ_ROUTING_ID, z_routing_id, strlen (z_routing_id)));
190
191 // Connect Y to X using a routing id
192 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
193 yconn, ZMQ_CONNECT_ROUTING_ID, x_routing_id, strlen (x_routing_id)));
194 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (yconn, x_endpoint));
195
196 // Send some data from Y to X.
197 send_string_expect_success (yconn, x_routing_id, ZMQ_SNDMORE);
198 send_string_expect_success (yconn, msg, 0);
199
200 // wait for the Y->X message to be received
201 msleep (SETTLE_TIME);
202
203 // Now X tries to connect to Z and send a message
204 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
205 xbind, ZMQ_CONNECT_ROUTING_ID, z_routing_id, strlen (z_routing_id)));
206 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (xbind, z_endpoint));
207
208 // Try to send some data from X to Z.
209 send_string_expect_success (xbind, z_routing_id, ZMQ_SNDMORE);
210 send_string_expect_success (xbind, msg, 0);
211
212 // wait for the X->Z message to be received (so that our non-blocking check will actually
213 // fail if the message is routed to Y)
214 msleep (SETTLE_TIME);
215
216 // nothing should have been received on the Y socket
217 TEST_ASSERT_FAILURE_ERRNO (EAGAIN,
218 zmq_recv (yconn, buff, 256, ZMQ_DONTWAIT));
219
220 // the message should have been received on the Z socket
221 recv_string_expect_success (zbind, x_routing_id, 0);
222 recv_string_expect_success (zbind, msg, 0);
223
224 TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (xbind, x_endpoint));
225 TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (zbind, z_endpoint));
226
227 test_context_socket_close (yconn);
228 test_context_socket_close (xbind);
229 test_context_socket_close (zbind);
230}
231
232void test_router_2_router_unnamed ()
233{
234 test_router_2_router (false);
235}
236
237void test_router_2_router_named ()
238{
239 test_router_2_router (true);
240}
241
242int main ()
243{
244 setup_test_environment ();
245
246 UNITY_BEGIN ();
247 RUN_TEST (test_stream_2_stream);
248 RUN_TEST (test_router_2_router_unnamed);
249 RUN_TEST (test_router_2_router_named);
250 RUN_TEST (test_router_2_router_while_receiving);
251 return UNITY_END ();
252}
253