1 | /* |
2 | Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | |
30 | #include "testutil.hpp" |
31 | #include "testutil_unity.hpp" |
32 | #include "testutil_security.hpp" |
33 | |
34 | SETUP_TEARDOWN_TESTCONTEXT |
35 | |
36 | void test_send_one_connected_one_unconnected () |
37 | { |
38 | int val; |
39 | // TEST 1. |
40 | // First we're going to attempt to send messages to two |
41 | // pipes, one connected, the other not. We should see |
42 | // the PUSH load balancing to both pipes, and hence half |
43 | // of the messages getting queued, as connect() creates a |
44 | // pipe immediately. |
45 | |
46 | void *to = test_context_socket (ZMQ_PULL); |
47 | int timeout = 5000; |
48 | TEST_ASSERT_SUCCESS_ERRNO ( |
49 | zmq_setsockopt (to, ZMQ_LINGER, &timeout, sizeof (timeout))); |
50 | |
51 | // Bind the one valid receiver |
52 | val = 0; |
53 | TEST_ASSERT_SUCCESS_ERRNO ( |
54 | zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val))); |
55 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (to, "tipc://{6555,0,0}" )); |
56 | |
57 | // Create a socket pushing to two endpoints - only 1 message should arrive. |
58 | void *from = test_context_socket (ZMQ_PUSH); |
59 | |
60 | val = 0; |
61 | TEST_ASSERT_SUCCESS_ERRNO ( |
62 | zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val))); |
63 | TEST_ASSERT_SUCCESS_ERRNO ( |
64 | zmq_setsockopt (from, ZMQ_LINGER, &timeout, sizeof (timeout))); |
65 | // This pipe will not connect |
66 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tipc://{5556,0}@0.0.0" )); |
67 | // This pipe will |
68 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tipc://{6555,0}@0.0.0" )); |
69 | |
70 | // We send 10 messages, 5 should just get stuck in the queue |
71 | // for the not-yet-connected pipe |
72 | const int send_count = 10; |
73 | for (int i = 0; i < send_count; ++i) { |
74 | send_string_expect_success (from, "Hello" , 0); |
75 | } |
76 | |
77 | // We now consume from the connected pipe |
78 | // - we should see just 5 |
79 | timeout = SETTLE_TIME; |
80 | TEST_ASSERT_SUCCESS_ERRNO ( |
81 | zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int))); |
82 | |
83 | int seen = 0; |
84 | while (true) { |
85 | char buffer[16]; |
86 | int rc = zmq_recv (to, &buffer, sizeof (buffer), 0); |
87 | if (rc == -1) { |
88 | TEST_ASSERT_EQUAL_INT (EAGAIN, zmq_errno ()); |
89 | break; // Break when we didn't get a message |
90 | } |
91 | seen++; |
92 | } |
93 | TEST_ASSERT_EQUAL_INT (send_count / 2, seen); |
94 | |
95 | test_context_socket_close (from); |
96 | test_context_socket_close (to); |
97 | } |
98 | |
99 | void test_send_one_connected_one_unconnected_with_delay () |
100 | { |
101 | int val; |
102 | |
103 | // TEST 2 |
104 | // This time we will do the same thing, connect two pipes, |
105 | // one of which will succeed in connecting to a bound |
106 | // receiver, the other of which will fail. However, we will |
107 | // also set the delay attach on connect flag, which should |
108 | // cause the pipe attachment to be delayed until the connection |
109 | // succeeds. |
110 | |
111 | // Bind the valid socket |
112 | void *to = test_context_socket (ZMQ_PULL); |
113 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (to, "tipc://{5560,0,0}" )); |
114 | int timeout = 5000; |
115 | TEST_ASSERT_SUCCESS_ERRNO ( |
116 | zmq_setsockopt (to, ZMQ_LINGER, &timeout, sizeof (timeout))); |
117 | |
118 | val = 0; |
119 | TEST_ASSERT_SUCCESS_ERRNO ( |
120 | zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val))); |
121 | |
122 | // Create a socket pushing to two endpoints - all messages should arrive. |
123 | void *from = test_context_socket (ZMQ_PUSH); |
124 | |
125 | val = 0; |
126 | TEST_ASSERT_SUCCESS_ERRNO ( |
127 | zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val))); |
128 | TEST_ASSERT_SUCCESS_ERRNO ( |
129 | zmq_setsockopt (from, ZMQ_LINGER, &timeout, sizeof (timeout))); |
130 | |
131 | // Set the key flag |
132 | val = 1; |
133 | TEST_ASSERT_SUCCESS_ERRNO ( |
134 | zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof (val))); |
135 | |
136 | // Connect to the invalid socket |
137 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tipc://{5561,0}@0.0.0" )); |
138 | // Connect to the valid socket |
139 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tipc://{5560,0}@0.0.0" )); |
140 | |
141 | // Send 10 messages, all should be routed to the connected pipe |
142 | const int send_count = 10; |
143 | for (int i = 0; i < send_count; ++i) { |
144 | send_string_expect_success (from, "Hello" , 0); |
145 | } |
146 | timeout = SETTLE_TIME; |
147 | TEST_ASSERT_SUCCESS_ERRNO ( |
148 | zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int))); |
149 | |
150 | int seen = 0; |
151 | while (true) { |
152 | char buffer[16]; |
153 | int rc = zmq_recv (to, &buffer, sizeof (buffer), 0); |
154 | if (rc == -1) { |
155 | TEST_ASSERT_EQUAL_INT (EAGAIN, zmq_errno ()); |
156 | break; // Break when we didn't get a message |
157 | } |
158 | seen++; |
159 | } |
160 | TEST_ASSERT_EQUAL_INT (send_count, seen); |
161 | |
162 | test_context_socket_close (from); |
163 | test_context_socket_close (to); |
164 | } |
165 | |
166 | void test_send_disconnected_with_delay () |
167 | { |
168 | // TEST 3 |
169 | // This time we want to validate that the same blocking behaviour |
170 | // occurs with an existing connection that is broken. We will send |
171 | // messages to a connected pipe, disconnect and verify the messages |
172 | // block. Then we reconnect and verify messages flow again. |
173 | void *backend = test_context_socket (ZMQ_DEALER); |
174 | void *frontend = test_context_socket (ZMQ_DEALER); |
175 | void *monitor = test_context_socket (ZMQ_PAIR); |
176 | int rc; |
177 | int zero = 0; |
178 | TEST_ASSERT_SUCCESS_ERRNO ( |
179 | zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero))); |
180 | TEST_ASSERT_SUCCESS_ERRNO ( |
181 | zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero))); |
182 | TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor (frontend, "inproc://monitor" , |
183 | ZMQ_EVENT_DISCONNECTED)); |
184 | int timeout = 5000; |
185 | TEST_ASSERT_SUCCESS_ERRNO ( |
186 | zmq_setsockopt (backend, ZMQ_LINGER, &timeout, sizeof (timeout))); |
187 | TEST_ASSERT_SUCCESS_ERRNO ( |
188 | zmq_setsockopt (frontend, ZMQ_LINGER, &timeout, sizeof (timeout))); |
189 | |
190 | // Frontend connects to backend using DELAY_ATTACH_ON_CONNECT |
191 | int on = 1; |
192 | TEST_ASSERT_SUCCESS_ERRNO ( |
193 | zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on))); |
194 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "tipc://{5560,0,0}" )); |
195 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (monitor, "inproc://monitor" )); |
196 | TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (frontend, "tipc://{5560,0}@0.0.0" )); |
197 | |
198 | // Ping backend to frontend so we know when the connection is up |
199 | send_string_expect_success (backend, "Hello" , 0); |
200 | recv_string_expect_success (frontend, "Hello" , 0); |
201 | |
202 | // Send message from frontend to backend |
203 | send_string_expect_success (frontend, "Hello" , ZMQ_DONTWAIT); |
204 | |
205 | test_context_socket_close (backend); |
206 | |
207 | // Wait for disconnect to happen |
208 | expect_monitor_event (monitor, ZMQ_EVENT_DISCONNECTED); |
209 | |
210 | // Send a message, might succeed depending on scheduling of the I/O thread |
211 | do { |
212 | rc = zmq_send (frontend, "Hello" , 5, ZMQ_DONTWAIT); |
213 | TEST_ASSERT_TRUE (rc == 5 || (rc == -1 && zmq_errno () == EAGAIN)); |
214 | } while (rc == 5); |
215 | |
216 | // Recreate backend socket |
217 | backend = test_context_socket (ZMQ_DEALER); |
218 | TEST_ASSERT_SUCCESS_ERRNO ( |
219 | zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero))); |
220 | TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "tipc://{5560,0,0}" )); |
221 | |
222 | // Ping backend to frontend so we know when the connection is up |
223 | send_string_expect_success (backend, "Hello" , 0); |
224 | recv_string_expect_success (frontend, "Hello" , 0); |
225 | |
226 | // After the reconnect, should succeed |
227 | send_string_expect_success (frontend, "Hello" , ZMQ_DONTWAIT); |
228 | |
229 | test_context_socket_close (monitor); |
230 | test_context_socket_close (backend); |
231 | test_context_socket_close (frontend); |
232 | } |
233 | |
234 | int main (void) |
235 | { |
236 | if (!is_tipc_available ()) { |
237 | printf ("TIPC environment unavailable, skipping test\n" ); |
238 | return 77; |
239 | } |
240 | |
241 | UNITY_BEGIN (); |
242 | RUN_TEST (test_send_one_connected_one_unconnected); |
243 | RUN_TEST (test_send_one_connected_one_unconnected_with_delay); |
244 | RUN_TEST (test_send_disconnected_with_delay); |
245 | return UNITY_END (); |
246 | } |
247 | |