1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29#include "../include/zmq.h"
30
31#include <stdio.h>
32#include <stdlib.h>
33#include <string.h>
34#include <assert.h>
35#include <time.h>
36#include <stdarg.h>
37#include <string.h>
38#include <string>
39
40#include "platform.hpp"
41
42#if defined ZMQ_HAVE_WINDOWS
43#include <windows.h>
44#include <process.h>
45#else
46#include <pthread.h>
47#include <unistd.h>
48#endif
49
50
51/*
52 Asynchronous proxy benchmark using ZMQ_XPUB_NODROP.
53
54 Topology:
55
56 XPUB SUB
57 | |
58 +-----> XSUB -> XPUB -----/
59 | ^^^^^^^^^^^^
60 XPUB ZMQ proxy
61
62 All connections use "inproc" transport. The two XPUB sockets start
63 flooding the proxy. The throughput is computed using the bytes received
64 in the SUB socket.
65*/
66
67
68#define HWM 10000
69
70#ifndef ARRAY_SIZE
71#define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x))
72#endif
73
74#define TEST_ASSERT_SUCCESS_ERRNO(expr) \
75 test_assert_success_message_errno_helper (expr, NULL, #expr)
76
77
78static uint64_t message_count = 0;
79static size_t message_size = 0;
80
81
82typedef struct
83{
84 void *context;
85 int thread_idx;
86 const char *frontend_endpoint[4];
87 const char *backend_endpoint[4];
88 const char *control_endpoint;
89} proxy_hwm_cfg_t;
90
91
92int test_assert_success_message_errno_helper (int rc_,
93 const char *msg_,
94 const char *expr_)
95{
96 if (rc_ == -1) {
97 char buffer[512];
98 buffer[sizeof (buffer) - 1] =
99 0; // to ensure defined behavior with VC++ <= 2013
100 printf ("%s failed%s%s%s, errno = %i (%s)", expr_,
101 msg_ ? " (additional info: " : "", msg_ ? msg_ : "",
102 msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ()));
103 exit (1);
104 }
105 return rc_;
106}
107
108static void set_hwm (void *skt)
109{
110 int hwm = HWM;
111
112 TEST_ASSERT_SUCCESS_ERRNO (
113 zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm)));
114
115 TEST_ASSERT_SUCCESS_ERRNO (
116 zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm)));
117}
118
119static void publisher_thread_main (void *pvoid)
120{
121 const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
122 const int idx = cfg->thread_idx;
123 int optval;
124 int rc;
125
126 void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB);
127 assert (pubsocket);
128
129 set_hwm (pubsocket);
130
131 optval = 1;
132 TEST_ASSERT_SUCCESS_ERRNO (
133 zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval)));
134
135 optval = 1;
136 TEST_ASSERT_SUCCESS_ERRNO (
137 zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval)));
138
139 TEST_ASSERT_SUCCESS_ERRNO (
140 zmq_connect (pubsocket, cfg->frontend_endpoint[idx]));
141
142 // Wait before starting TX operations till 1 subscriber has subscribed
143 // (in this test there's 1 subscriber only)
144 char buffer[32] = {};
145 rc = TEST_ASSERT_SUCCESS_ERRNO (
146 zmq_recv (pubsocket, buffer, sizeof (buffer), 0));
147 if (rc != 1) {
148 printf ("invalid response length: expected 1, received %d", rc);
149 exit (1);
150 }
151 if (buffer[0] != 1) {
152 printf ("invalid response value: expected 1, received %d",
153 (int) buffer[0]);
154 exit (1);
155 }
156
157 zmq_msg_t msg_orig;
158 rc = zmq_msg_init_size (&msg_orig, message_size);
159 assert (rc == 0);
160 memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig));
161
162 uint64_t send_count = 0;
163 while (send_count < message_count) {
164 zmq_msg_t msg;
165 zmq_msg_init (&msg);
166 rc = zmq_msg_copy (&msg, &msg_orig);
167 assert (rc == 0);
168
169 // Send the message to the socket
170 rc = zmq_msg_send (&msg, pubsocket, 0);
171 if (rc != -1) {
172 send_count++;
173 } else {
174 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
175 }
176 }
177
178 zmq_close (pubsocket);
179 //printf ("publisher thread ended\n");
180}
181
182static void subscriber_thread_main (void *pvoid)
183{
184 const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
185 const int idx = cfg->thread_idx;
186
187 void *subsocket = zmq_socket (cfg->context, ZMQ_SUB);
188 assert (subsocket);
189
190 set_hwm (subsocket);
191
192 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0));
193
194 TEST_ASSERT_SUCCESS_ERRNO (
195 zmq_connect (subsocket, cfg->backend_endpoint[idx]));
196
197 // Receive message_count messages
198 uint64_t rxsuccess = 0;
199 bool success = true;
200 while (success) {
201 zmq_msg_t msg;
202 int rc = zmq_msg_init (&msg);
203 assert (rc == 0);
204
205 rc = zmq_msg_recv (&msg, subsocket, 0);
206 if (rc != -1) {
207 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
208 rxsuccess++;
209 }
210
211 if (rxsuccess == message_count)
212 break;
213 }
214
215 // Cleanup
216
217 zmq_close (subsocket);
218 //printf ("subscriber thread ended\n");
219}
220
221static void proxy_thread_main (void *pvoid)
222{
223 const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
224 int rc;
225
226 // FRONTEND SUB
227
228 void *frontend_xsub = zmq_socket (
229 cfg->context,
230 ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC)
231 assert (frontend_xsub);
232
233 set_hwm (frontend_xsub);
234
235 // Bind FRONTEND
236 for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) {
237 const char *ep = cfg->frontend_endpoint[i];
238 if (ep != NULL) {
239 assert (strlen (ep) > 5);
240 rc = zmq_bind (frontend_xsub, ep);
241 assert (rc == 0);
242 }
243 }
244
245 // BACKEND PUB
246
247 void *backend_xpub = zmq_socket (
248 cfg->context,
249 ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
250 assert (backend_xpub);
251
252 int optval = 1;
253 rc =
254 zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval));
255 assert (rc == 0);
256
257 set_hwm (backend_xpub);
258
259 // Bind BACKEND
260 for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) {
261 const char *ep = cfg->backend_endpoint[i];
262 if (ep != NULL) {
263 assert (strlen (ep) > 5);
264 rc = zmq_bind (backend_xpub, ep);
265 assert (rc == 0);
266 }
267 }
268
269 // CONTROL REP
270
271 void *control_rep = zmq_socket (
272 cfg->context,
273 ZMQ_REP); // This one is used by the proxy to receive&reply to commands
274 assert (control_rep);
275
276 // Bind CONTROL
277 rc = zmq_bind (control_rep, cfg->control_endpoint);
278 assert (rc == 0);
279
280 // Start proxying!
281
282 zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);
283
284 zmq_close (frontend_xsub);
285 zmq_close (backend_xpub);
286 zmq_close (control_rep);
287 //printf ("proxy thread ended\n");
288}
289
290void terminate_proxy (const proxy_hwm_cfg_t *cfg)
291{
292 // CONTROL REQ
293
294 void *control_req = zmq_socket (
295 cfg->context,
296 ZMQ_REQ); // This one can be used to send command to the proxy
297 assert (control_req);
298
299 // Connect CONTROL-REQ: a socket to which send commands
300 int rc = zmq_connect (control_req, cfg->control_endpoint);
301 assert (rc == 0);
302
303 // Ask the proxy to exit: the subscriber has received all messages
304
305 rc = zmq_send (control_req, "TERMINATE", 9, 0);
306 assert (rc == 9);
307
308 zmq_close (control_req);
309}
310
311// The main thread simply starts some publishers, a proxy,
312// and a subscriber. Finish when all packets are received.
313
314int main (int argc, char *argv[])
315{
316 if (argc != 3) {
317 printf ("usage: proxy_thr <message-size> <message-count>\n");
318 return 1;
319 }
320
321 message_size = atoi (argv[1]);
322 message_count = atoi (argv[2]);
323 printf ("message size: %d [B]\n", (int) message_size);
324 printf ("message count: %d\n", (int) message_count);
325
326 void *context = zmq_ctx_new ();
327 assert (context);
328
329 int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4);
330 assert (rv == 0);
331
332 // START ALL SECONDARY THREADS
333
334 const char *pub1 = "inproc://perf_pub1";
335 const char *pub2 = "inproc://perf_pub2";
336 const char *sub1 = "inproc://perf_backend";
337
338 proxy_hwm_cfg_t cfg_global = {};
339 cfg_global.context = context;
340 cfg_global.frontend_endpoint[0] = pub1;
341 cfg_global.frontend_endpoint[1] = pub2;
342 cfg_global.backend_endpoint[0] = sub1;
343 cfg_global.control_endpoint = "inproc://ctrl";
344
345 // Proxy
346 proxy_hwm_cfg_t cfg_proxy = cfg_global;
347 void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy);
348 assert (proxy != 0);
349
350 // Subscriber 1
351 proxy_hwm_cfg_t cfg_sub1 = cfg_global;
352 cfg_sub1.thread_idx = 0;
353 void *subscriber =
354 zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1);
355 assert (subscriber != 0);
356
357 // Start measuring
358 void *watch = zmq_stopwatch_start ();
359
360 // Publisher 1
361 proxy_hwm_cfg_t cfg_pub1 = cfg_global;
362 cfg_pub1.thread_idx = 0;
363 void *publisher1 =
364 zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1);
365 assert (publisher1 != 0);
366
367 // Publisher 2
368 proxy_hwm_cfg_t cfg_pub2 = cfg_global;
369 cfg_pub2.thread_idx = 1;
370 void *publisher2 =
371 zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2);
372 assert (publisher2 != 0);
373
374 // Wait for all packets to be received
375 zmq_threadclose (subscriber);
376
377 // Stop measuring
378 unsigned long elapsed = zmq_stopwatch_stop (watch);
379 if (elapsed == 0)
380 elapsed = 1;
381
382 unsigned long throughput =
383 (unsigned long) ((double) message_count / (double) elapsed * 1000000);
384 double megabits = (double) (throughput * message_size * 8) / 1000000;
385
386 printf ("mean throughput: %d [msg/s]\n", (int) throughput);
387 printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
388
389 // Wait for the end of publishers...
390 zmq_threadclose (publisher1);
391 zmq_threadclose (publisher2);
392
393 // ... then close the proxy
394 terminate_proxy (&cfg_proxy);
395 zmq_threadclose (proxy);
396
397 int rc = zmq_ctx_term (context);
398 assert (rc == 0);
399
400 return 0;
401}
402