1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33SETUP_TEARDOWN_TESTCONTEXT
34
35void test_immediate_1 ()
36{
37 int val;
38 int rc;
39 char buffer[16];
40 size_t len = MAX_SOCKET_STRING;
41 char my_endpoint[MAX_SOCKET_STRING];
42 // TEST 1.
43 // First we're going to attempt to send messages to two
44 // pipes, one connected, the other not. We should see
45 // the PUSH load balancing to both pipes, and hence half
46 // of the messages getting queued, as connect() creates a
47 // pipe immediately.
48
49 void *to = test_context_socket (ZMQ_PULL);
50
51 // Bind the one valid receiver
52 val = 0;
53 TEST_ASSERT_SUCCESS_ERRNO (
54 zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val)));
55 bind_loopback_ipv4 (to, my_endpoint, len);
56
57 // Create a socket pushing to two endpoints - only 1 message should arrive.
58 void *from = test_context_socket (ZMQ_PUSH);
59
60 val = 0;
61 TEST_ASSERT_SUCCESS_ERRNO (
62 zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)));
63 // This pipe will not connect (provided the ephemeral port is not 5556)
64 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tcp://localhost:5556"));
65 // This pipe will
66 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, my_endpoint));
67
68 msleep (SETTLE_TIME);
69
70 // We send 10 messages, 5 should just get stuck in the queue
71 // for the not-yet-connected pipe
72 for (int i = 0; i < 10; ++i) {
73 send_string_expect_success (from, "Hello", 0);
74 }
75
76 // We now consume from the connected pipe
77 // - we should see just 5
78 int timeout = 250;
79 TEST_ASSERT_SUCCESS_ERRNO (
80 zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
81
82 int seen = 0;
83 while (true) {
84 rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
85 if (rc == -1)
86 break; // Break when we didn't get a message
87 seen++;
88 }
89 TEST_ASSERT_EQUAL_INT (5, seen);
90
91 test_context_socket_close (from);
92 test_context_socket_close (to);
93}
94
95
96void test_immediate_2 ()
97{
98 // This time we will do the same thing, connect two pipes,
99 // one of which will succeed in connecting to a bound
100 // receiver, the other of which will fail. However, we will
101 // also set the delay attach on connect flag, which should
102 // cause the pipe attachment to be delayed until the connection
103 // succeeds.
104
105 // Bind the valid socket
106 void *to = test_context_socket (ZMQ_PULL);
107 size_t len = MAX_SOCKET_STRING;
108 char my_endpoint[MAX_SOCKET_STRING];
109 bind_loopback_ipv4 (to, my_endpoint, len);
110
111 int val = 0;
112 TEST_ASSERT_SUCCESS_ERRNO (
113 zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof (val)));
114
115 // Create a socket pushing to two endpoints - all messages should arrive.
116 void *from = test_context_socket (ZMQ_PUSH);
117
118 val = 0;
119 TEST_ASSERT_SUCCESS_ERRNO (
120 zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof (val)));
121
122 // Set the key flag
123 val = 1;
124 TEST_ASSERT_SUCCESS_ERRNO (
125 zmq_setsockopt (from, ZMQ_IMMEDIATE, &val, sizeof (val)));
126
127 // Connect to the invalid socket
128 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, "tcp://localhost:5561"));
129 // Connect to the valid socket
130 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (from, my_endpoint));
131
132 // Send 10 messages, all should be routed to the connected pipe
133 for (int i = 0; i < 10; ++i) {
134 send_string_expect_success (from, "Hello", 0);
135 }
136 int timeout = 250;
137 TEST_ASSERT_SUCCESS_ERRNO (
138 zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)));
139
140 int seen = 0;
141 while (true) {
142 char buffer[16];
143 int rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
144 if (rc == -1)
145 break; // Break when we didn't get a message
146 seen++;
147 }
148 TEST_ASSERT_EQUAL_INT (10, seen);
149
150 test_context_socket_close (from);
151 test_context_socket_close (to);
152}
153
154void test_immediate_3 ()
155{
156 // This time we want to validate that the same blocking behaviour
157 // occurs with an existing connection that is broken. We will send
158 // messages to a connected pipe, disconnect and verify the messages
159 // block. Then we reconnect and verify messages flow again.
160 void *backend = test_context_socket (ZMQ_DEALER);
161 void *frontend = test_context_socket (ZMQ_DEALER);
162
163 int zero = 0;
164 TEST_ASSERT_SUCCESS_ERRNO (
165 zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)));
166 TEST_ASSERT_SUCCESS_ERRNO (
167 zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero)));
168
169 // Frontend connects to backend using IMMEDIATE
170 int on = 1;
171 TEST_ASSERT_SUCCESS_ERRNO (
172 zmq_setsockopt (frontend, ZMQ_IMMEDIATE, &on, sizeof (on)));
173
174 size_t len = MAX_SOCKET_STRING;
175 char my_endpoint[MAX_SOCKET_STRING];
176 bind_loopback_ipv4 (backend, my_endpoint, len);
177
178 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (frontend, my_endpoint));
179
180 // Ping backend to frontend so we know when the connection is up
181 send_string_expect_success (backend, "Hello", 0);
182 recv_string_expect_success (frontend, "Hello", 0);
183
184 // Send message from frontend to backend
185 send_string_expect_success (frontend, "Hello", ZMQ_DONTWAIT);
186
187 test_context_socket_close (backend);
188
189 // Give time to process disconnect
190 msleep (SETTLE_TIME * 10);
191
192 // Send a message, should fail
193 TEST_ASSERT_FAILURE_ERRNO (EAGAIN,
194 zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT));
195
196 // Recreate backend socket
197 backend = test_context_socket (ZMQ_DEALER);
198 TEST_ASSERT_SUCCESS_ERRNO (
199 zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)));
200 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, my_endpoint));
201
202 // Ping backend to frontend so we know when the connection is up
203 send_string_expect_success (backend, "Hello", 0);
204 recv_string_expect_success (frontend, "Hello", 0);
205
206 // After the reconnect, should succeed
207 send_string_expect_success (frontend, "Hello", ZMQ_DONTWAIT);
208
209 test_context_socket_close (backend);
210 test_context_socket_close (frontend);
211}
212
213int main (void)
214{
215 setup_test_environment ();
216 UNITY_BEGIN ();
217 RUN_TEST (test_immediate_1);
218 RUN_TEST (test_immediate_2);
219 RUN_TEST (test_immediate_3);
220 return UNITY_END ();
221}
222