1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32#include <string.h>
33#include <unity.h>
34#include <assert.h>
35#include <unistd.h>
36
37//
38// Asynchronous proxy test using ZMQ_XPUB_NODROP and HWM:
39//
40// Topology:
41//
42// XPUB SUB
43// | |
44// \-----> XSUB -> XPUB -----/
45// ^^^^^^^^^^^^^^
46// ZMQ proxy
47//
48// All connections use "inproc" transport and have artificially-low HWMs set.
49// Then the PUB socket starts flooding the Proxy. The SUB is artificially slow
50// at receiving messages.
51// This scenario simulates what happens when a SUB is slower than
52// its (X)PUB: since ZMQ_XPUB_NODROP=1, the XPUB will block and then
53// also the (X)PUB socket will block.
54// The exact number of the messages that go through before (X)PUB blocks depends
55// on ZeroMQ internals and how the OS will schedule the different threads.
56// In the meanwhile asking statistics to the Proxy must NOT be blocking.
57//
58
59
60#define HWM 10
61#define NUM_BYTES_PER_MSG 50000
62
63
64typedef struct
65{
66 void *context;
67 const char *frontend_endpoint;
68 const char *backend_endpoint;
69 const char *control_endpoint;
70
71 void *subscriber_received_all;
72} proxy_hwm_cfg_t;
73
74static void lower_hwm (void *skt_)
75{
76 int send_hwm = HWM;
77 TEST_ASSERT_SUCCESS_ERRNO (
78 zmq_setsockopt (skt_, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
79
80 TEST_ASSERT_SUCCESS_ERRNO (
81 zmq_setsockopt (skt_, ZMQ_RCVHWM, &send_hwm, sizeof (send_hwm)));
82}
83
84static void publisher_thread_main (void *pvoid_)
85{
86 proxy_hwm_cfg_t *cfg = static_cast<proxy_hwm_cfg_t *> (pvoid_);
87
88 void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
89 assert (pubsocket);
90
91 lower_hwm (pubsocket);
92
93 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pubsocket, cfg->frontend_endpoint));
94
95 int optval = 1;
96 TEST_ASSERT_SUCCESS_ERRNO (
97 zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
98
99 // Wait before starting TX operations till 1 subscriber has subscribed
100 // (in this test there's 1 subscriber only)
101 const char subscription_to_all_topics[] = {1, 0};
102 recv_string_expect_success (pubsocket, subscription_to_all_topics, 0);
103
104 uint64_t send_count = 0;
105 while (true) {
106 zmq_msg_t msg;
107 int rc = zmq_msg_init_size (&msg, NUM_BYTES_PER_MSG);
108 assert (rc == 0);
109
110 /* Fill in message content with 'AAAAAA' */
111 memset (zmq_msg_data (&msg), 'A', NUM_BYTES_PER_MSG);
112
113 /* Send the message to the socket */
114 rc = zmq_msg_send (&msg, pubsocket, ZMQ_DONTWAIT);
115 if (rc != -1) {
116 send_count++;
117 } else {
118 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
119 break;
120 }
121 }
122
123 // VERIFY EXPECTED RESULTS
124 // EXPLANATION FOR TX TO BE CONSIDERED SUCCESSFUL:
125 // this test has 3 threads doing I/O across 2 queues. Depending on the scheduling,
126 // it might happen that 20, 30 or 40 messages go through before the pub blocks.
127 // That's because the receiver thread gets kicked once every (hwm_ + 1) / 2 sent
128 // messages (search for zeromq sources compute_lwm function).
129 // So depending on the scheduling of the second thread, the publisher might get one,
130 // two or three more batches in. The ceiling is 40 as there's 2 queues.
131 //
132 assert (4 * HWM >= send_count && 2 * HWM <= send_count);
133
134 // CLEANUP
135
136 zmq_close (pubsocket);
137}
138
139static void subscriber_thread_main (void *pvoid_)
140{
141 proxy_hwm_cfg_t *cfg = static_cast<proxy_hwm_cfg_t *> (pvoid_);
142
143 void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
144 assert (subsocket);
145
146 lower_hwm (subsocket);
147
148 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0));
149
150 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subsocket, cfg->backend_endpoint));
151
152
153 // receive all sent messages
154 uint64_t rxsuccess = 0;
155 bool success = true;
156 while (success) {
157 zmq_msg_t msg;
158 int rc = zmq_msg_init (&msg);
159 assert (rc == 0);
160
161 rc = zmq_msg_recv (&msg, subsocket, 0);
162 if (rc != -1) {
163 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
164 rxsuccess++;
165
166 // after receiving 1st message, set a finite timeout (default is infinite)
167 int timeout_ms = 100;
168 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
169 subsocket, ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
170 } else {
171 break;
172 }
173
174 msleep (100);
175 }
176
177
178 // VERIFY EXPECTED RESULTS
179 // EXPLANATION FOR RX TO BE CONSIDERED SUCCESSFUL:
180 // see publisher thread why we have 3 possible outcomes as number of RX messages
181
182 assert (4 * HWM >= rxsuccess && 2 * HWM <= rxsuccess);
183
184 // INFORM THAT WE COMPLETED:
185
186 zmq_atomic_counter_inc (cfg->subscriber_received_all);
187
188 // CLEANUP
189
190 zmq_close (subsocket);
191}
192
193bool recv_stat (void *sock_, bool last_, uint64_t *res_)
194{
195 zmq_msg_t stats_msg;
196
197 int rc = zmq_msg_init (&stats_msg);
198 assert (rc == 0);
199
200 rc = zmq_msg_recv (&stats_msg, sock_, 0); //ZMQ_DONTWAIT);
201 if (rc == -1 && errno == EAGAIN) {
202 rc = zmq_msg_close (&stats_msg);
203 assert (rc == 0);
204 return false; // cannot retrieve the stat
205 }
206
207 assert (rc == sizeof (uint64_t));
208 memcpy (res_, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg));
209
210 rc = zmq_msg_close (&stats_msg);
211 assert (rc == 0);
212
213 int more;
214 size_t moresz = sizeof more;
215 rc = zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz);
216 assert (rc == 0);
217 assert ((last_ && !more) || (!last_ && more));
218
219 return true;
220}
221
222// Utility function to interrogate the proxy:
223
224typedef struct
225{
226 uint64_t msg_in;
227 uint64_t bytes_in;
228 uint64_t msg_out;
229 uint64_t bytes_out;
230} zmq_socket_stats_t;
231
232typedef struct
233{
234 zmq_socket_stats_t frontend;
235 zmq_socket_stats_t backend;
236} zmq_proxy_stats_t;
237
238bool check_proxy_stats (void *control_proxy_)
239{
240 zmq_proxy_stats_t total_stats;
241 int rc;
242
243 rc = zmq_send (control_proxy_, "STATISTICS", 10, ZMQ_DONTWAIT);
244 assert (rc == 10 || (rc == -1 && errno == EAGAIN));
245 if (rc == -1 && errno == EAGAIN) {
246 return false;
247 }
248
249 // first frame of the reply contains FRONTEND stats:
250 if (!recv_stat (control_proxy_, false, &total_stats.frontend.msg_in)) {
251 return false;
252 }
253
254 recv_stat (control_proxy_, false, &total_stats.frontend.bytes_in);
255 recv_stat (control_proxy_, false, &total_stats.frontend.msg_out);
256 recv_stat (control_proxy_, false, &total_stats.frontend.bytes_out);
257
258 // second frame of the reply contains BACKEND stats:
259 recv_stat (control_proxy_, false, &total_stats.backend.msg_in);
260 recv_stat (control_proxy_, false, &total_stats.backend.bytes_in);
261 recv_stat (control_proxy_, false, &total_stats.backend.msg_out);
262 recv_stat (control_proxy_, true, &total_stats.backend.bytes_out);
263
264 return true;
265}
266
267static void proxy_stats_asker_thread_main (void *pvoid_)
268{
269 proxy_hwm_cfg_t *cfg = static_cast<proxy_hwm_cfg_t *> (pvoid_);
270
271
272 // CONTROL REQ
273
274 void *control_req =
275 zmq_socket (cfg->context,
276 ZMQ_REQ); // this one can be used to send command to the proxy
277 assert (control_req);
278
279 // connect CONTROL-REQ: a socket to which send commands
280 int rc = zmq_connect (control_req, cfg->control_endpoint);
281 assert (rc == 0);
282
283
284 // IMPORTANT: by setting the tx/rx timeouts, we avoid getting blocked when interrogating a proxy which is
285 // itself blocked in a zmq_msg_send() on its XPUB socket having ZMQ_XPUB_NODROP=1!
286
287 int optval = 10;
288 rc = zmq_setsockopt (control_req, ZMQ_SNDTIMEO, &optval, sizeof (optval));
289 assert (rc == 0);
290 rc = zmq_setsockopt (control_req, ZMQ_RCVTIMEO, &optval, sizeof (optval));
291 assert (rc == 0);
292
293 optval = 10;
294 rc =
295 zmq_setsockopt (control_req, ZMQ_REQ_CORRELATE, &optval, sizeof (optval));
296 assert (rc == 0);
297
298 rc =
299 zmq_setsockopt (control_req, ZMQ_REQ_RELAXED, &optval, sizeof (optval));
300 assert (rc == 0);
301
302
303 // Start!
304
305 while (!zmq_atomic_counter_value (cfg->subscriber_received_all)) {
306 check_proxy_stats (control_req);
307 usleep (1000); // 1ms -> in best case we will get 1000updates/second
308 }
309
310
311 // Ask the proxy to exit: the subscriber has received all messages
312
313 rc = zmq_send (control_req, "TERMINATE", 9, 0);
314 assert (rc == 9);
315
316 zmq_close (control_req);
317}
318
319static void proxy_thread_main (void *pvoid_)
320{
321 proxy_hwm_cfg_t *cfg = static_cast<proxy_hwm_cfg_t *> (pvoid_);
322 int rc;
323
324
325 // FRONTEND SUB
326
327 void *frontend_xsub = zmq_socket (
328 cfg->context,
329 ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
330 assert (frontend_xsub);
331
332 lower_hwm (frontend_xsub);
333
334 // bind FRONTEND
335 rc = zmq_bind (frontend_xsub, cfg->frontend_endpoint);
336 assert (rc == 0);
337
338
339 // BACKEND PUB
340
341 void *backend_xpub = zmq_socket (
342 cfg->context,
343 ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
344 assert (backend_xpub);
345
346 int optval = 1;
347 rc =
348 zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
349 assert (rc == 0);
350
351 lower_hwm (backend_xpub);
352
353 // bind BACKEND
354 rc = zmq_bind (backend_xpub, cfg->backend_endpoint);
355 assert (rc == 0);
356
357
358 // CONTROL REP
359
360 void *control_rep = zmq_socket (
361 cfg->context,
362 ZMQ_REP); // this one is used by the proxy to receive&reply to commands
363 assert (control_rep);
364
365 // bind CONTROL
366 rc = zmq_bind (control_rep, cfg->control_endpoint);
367 assert (rc == 0);
368
369
370 // start proxying!
371
372 zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);
373
374 zmq_close (frontend_xsub);
375 zmq_close (backend_xpub);
376 zmq_close (control_rep);
377}
378
379
380// The main thread simply starts several clients and a server, and then
381// waits for the server to finish.
382
383int main (void)
384{
385 setup_test_environment ();
386
387 void *context = zmq_ctx_new ();
388 assert (context);
389
390
391 // START ALL SECONDARY THREADS
392
393 proxy_hwm_cfg_t cfg;
394 cfg.context = context;
395 cfg.frontend_endpoint = "inproc://frontend";
396 cfg.backend_endpoint = "inproc://backend";
397 cfg.control_endpoint = "inproc://ctrl";
398 cfg.subscriber_received_all = zmq_atomic_counter_new ();
399
400 void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg);
401 assert (proxy != 0);
402 void *publisher = zmq_threadstart (&publisher_thread_main, (void *) &cfg);
403 assert (publisher != 0);
404 void *subscriber = zmq_threadstart (&subscriber_thread_main, (void *) &cfg);
405 assert (subscriber != 0);
406 void *asker =
407 zmq_threadstart (&proxy_stats_asker_thread_main, (void *) &cfg);
408 assert (asker != 0);
409
410
411 // CLEANUP
412
413 zmq_threadclose (publisher);
414 zmq_threadclose (subscriber);
415 zmq_threadclose (asker);
416 zmq_threadclose (proxy);
417
418 int rc = zmq_ctx_term (context);
419 assert (rc == 0);
420
421 zmq_atomic_counter_destroy (&cfg.subscriber_received_all);
422
423 return 0;
424}
425