1 | /* |
2 | Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file |
3 | |
4 | This file is part of libzmq, the ZeroMQ core engine in C++. |
5 | |
6 | libzmq is free software; you can redistribute it and/or modify it under |
7 | the terms of the GNU Lesser General Public License (LGPL) as published |
8 | by the Free Software Foundation; either version 3 of the License, or |
9 | (at your option) any later version. |
10 | |
11 | As a special exception, the Contributors give you permission to link |
12 | this library with independent modules to produce an executable, |
13 | regardless of the license terms of these independent modules, and to |
14 | copy and distribute the resulting executable under terms of your choice, |
15 | provided that you also meet, for each linked independent module, the |
16 | terms and conditions of the license of that module. An independent |
17 | module is a module which is not derived from or based on this library. |
18 | If you modify this library, you must extend this exception to your |
19 | version of the library. |
20 | |
21 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
22 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
23 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
24 | License for more details. |
25 | |
26 | You should have received a copy of the GNU Lesser General Public License |
27 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
28 | */ |
29 | #include "../include/zmq.h" |
30 | |
31 | #include <stdio.h> |
32 | #include <stdlib.h> |
33 | #include <string.h> |
34 | #include <assert.h> |
35 | #include <time.h> |
36 | #include <stdarg.h> |
37 | #include <string.h> |
38 | #include <string> |
39 | |
40 | #include "platform.hpp" |
41 | |
42 | #if defined ZMQ_HAVE_WINDOWS |
43 | #include <windows.h> |
44 | #include <process.h> |
45 | #else |
46 | #include <pthread.h> |
47 | #include <unistd.h> |
48 | #endif |
49 | |
50 | |
51 | /* |
52 | Asynchronous proxy benchmark using ZMQ_XPUB_NODROP. |
53 | |
54 | Topology: |
55 | |
56 | XPUB SUB |
57 | | | |
58 | +-----> XSUB -> XPUB -----/ |
59 | | ^^^^^^^^^^^^ |
60 | XPUB ZMQ proxy |
61 | |
62 | All connections use "inproc" transport. The two XPUB sockets start |
63 | flooding the proxy. The throughput is computed using the bytes received |
64 | in the SUB socket. |
65 | */ |
66 | |
67 | |
68 | #define HWM 10000 |
69 | |
70 | #ifndef ARRAY_SIZE |
71 | #define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x)) |
72 | #endif |
73 | |
74 | #define TEST_ASSERT_SUCCESS_ERRNO(expr) \ |
75 | test_assert_success_message_errno_helper (expr, NULL, #expr) |
76 | |
77 | |
78 | static uint64_t message_count = 0; |
79 | static size_t message_size = 0; |
80 | |
81 | |
82 | typedef struct |
83 | { |
84 | void *context; |
85 | int thread_idx; |
86 | const char *frontend_endpoint[4]; |
87 | const char *backend_endpoint[4]; |
88 | const char *control_endpoint; |
89 | } proxy_hwm_cfg_t; |
90 | |
91 | |
92 | int test_assert_success_message_errno_helper (int rc_, |
93 | const char *msg_, |
94 | const char *expr_) |
95 | { |
96 | if (rc_ == -1) { |
97 | char buffer[512]; |
98 | buffer[sizeof (buffer) - 1] = |
99 | 0; // to ensure defined behavior with VC++ <= 2013 |
100 | printf ("%s failed%s%s%s, errno = %i (%s)" , expr_, |
101 | msg_ ? " (additional info: " : "" , msg_ ? msg_ : "" , |
102 | msg_ ? ")" : "" , zmq_errno (), zmq_strerror (zmq_errno ())); |
103 | exit (1); |
104 | } |
105 | return rc_; |
106 | } |
107 | |
108 | static void set_hwm (void *skt) |
109 | { |
110 | int hwm = HWM; |
111 | |
112 | TEST_ASSERT_SUCCESS_ERRNO ( |
113 | zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm))); |
114 | |
115 | TEST_ASSERT_SUCCESS_ERRNO ( |
116 | zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm))); |
117 | } |
118 | |
119 | static void publisher_thread_main (void *pvoid) |
120 | { |
121 | const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; |
122 | const int idx = cfg->thread_idx; |
123 | int optval; |
124 | int rc; |
125 | |
126 | void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB); |
127 | assert (pubsocket); |
128 | |
129 | set_hwm (pubsocket); |
130 | |
131 | optval = 1; |
132 | TEST_ASSERT_SUCCESS_ERRNO ( |
133 | zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval))); |
134 | |
135 | optval = 1; |
136 | TEST_ASSERT_SUCCESS_ERRNO ( |
137 | zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval))); |
138 | |
139 | TEST_ASSERT_SUCCESS_ERRNO ( |
140 | zmq_connect (pubsocket, cfg->frontend_endpoint[idx])); |
141 | |
142 | // Wait before starting TX operations till 1 subscriber has subscribed |
143 | // (in this test there's 1 subscriber only) |
144 | char buffer[32] = {}; |
145 | rc = TEST_ASSERT_SUCCESS_ERRNO ( |
146 | zmq_recv (pubsocket, buffer, sizeof (buffer), 0)); |
147 | if (rc != 1) { |
148 | printf ("invalid response length: expected 1, received %d" , rc); |
149 | exit (1); |
150 | } |
151 | if (buffer[0] != 1) { |
152 | printf ("invalid response value: expected 1, received %d" , |
153 | (int) buffer[0]); |
154 | exit (1); |
155 | } |
156 | |
157 | zmq_msg_t msg_orig; |
158 | rc = zmq_msg_init_size (&msg_orig, message_size); |
159 | assert (rc == 0); |
160 | memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig)); |
161 | |
162 | uint64_t send_count = 0; |
163 | while (send_count < message_count) { |
164 | zmq_msg_t msg; |
165 | zmq_msg_init (&msg); |
166 | rc = zmq_msg_copy (&msg, &msg_orig); |
167 | assert (rc == 0); |
168 | |
169 | // Send the message to the socket |
170 | rc = zmq_msg_send (&msg, pubsocket, 0); |
171 | if (rc != -1) { |
172 | send_count++; |
173 | } else { |
174 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); |
175 | } |
176 | } |
177 | |
178 | zmq_close (pubsocket); |
179 | //printf ("publisher thread ended\n"); |
180 | } |
181 | |
182 | static void subscriber_thread_main (void *pvoid) |
183 | { |
184 | const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; |
185 | const int idx = cfg->thread_idx; |
186 | |
187 | void *subsocket = zmq_socket (cfg->context, ZMQ_SUB); |
188 | assert (subsocket); |
189 | |
190 | set_hwm (subsocket); |
191 | |
192 | TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0)); |
193 | |
194 | TEST_ASSERT_SUCCESS_ERRNO ( |
195 | zmq_connect (subsocket, cfg->backend_endpoint[idx])); |
196 | |
197 | // Receive message_count messages |
198 | uint64_t rxsuccess = 0; |
199 | bool success = true; |
200 | while (success) { |
201 | zmq_msg_t msg; |
202 | int rc = zmq_msg_init (&msg); |
203 | assert (rc == 0); |
204 | |
205 | rc = zmq_msg_recv (&msg, subsocket, 0); |
206 | if (rc != -1) { |
207 | TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); |
208 | rxsuccess++; |
209 | } |
210 | |
211 | if (rxsuccess == message_count) |
212 | break; |
213 | } |
214 | |
215 | // Cleanup |
216 | |
217 | zmq_close (subsocket); |
218 | //printf ("subscriber thread ended\n"); |
219 | } |
220 | |
221 | static void proxy_thread_main (void *pvoid) |
222 | { |
223 | const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; |
224 | int rc; |
225 | |
226 | // FRONTEND SUB |
227 | |
228 | void *frontend_xsub = zmq_socket ( |
229 | cfg->context, |
230 | ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC) |
231 | assert (frontend_xsub); |
232 | |
233 | set_hwm (frontend_xsub); |
234 | |
235 | // Bind FRONTEND |
236 | for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) { |
237 | const char *ep = cfg->frontend_endpoint[i]; |
238 | if (ep != NULL) { |
239 | assert (strlen (ep) > 5); |
240 | rc = zmq_bind (frontend_xsub, ep); |
241 | assert (rc == 0); |
242 | } |
243 | } |
244 | |
245 | // BACKEND PUB |
246 | |
247 | void *backend_xpub = zmq_socket ( |
248 | cfg->context, |
249 | ZMQ_XPUB); // the backend is the one exposed to the external world (TCP) |
250 | assert (backend_xpub); |
251 | |
252 | int optval = 1; |
253 | rc = |
254 | zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval)); |
255 | assert (rc == 0); |
256 | |
257 | set_hwm (backend_xpub); |
258 | |
259 | // Bind BACKEND |
260 | for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) { |
261 | const char *ep = cfg->backend_endpoint[i]; |
262 | if (ep != NULL) { |
263 | assert (strlen (ep) > 5); |
264 | rc = zmq_bind (backend_xpub, ep); |
265 | assert (rc == 0); |
266 | } |
267 | } |
268 | |
269 | // CONTROL REP |
270 | |
271 | void *control_rep = zmq_socket ( |
272 | cfg->context, |
273 | ZMQ_REP); // This one is used by the proxy to receive&reply to commands |
274 | assert (control_rep); |
275 | |
276 | // Bind CONTROL |
277 | rc = zmq_bind (control_rep, cfg->control_endpoint); |
278 | assert (rc == 0); |
279 | |
280 | // Start proxying! |
281 | |
282 | zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep); |
283 | |
284 | zmq_close (frontend_xsub); |
285 | zmq_close (backend_xpub); |
286 | zmq_close (control_rep); |
287 | //printf ("proxy thread ended\n"); |
288 | } |
289 | |
290 | void terminate_proxy (const proxy_hwm_cfg_t *cfg) |
291 | { |
292 | // CONTROL REQ |
293 | |
294 | void *control_req = zmq_socket ( |
295 | cfg->context, |
296 | ZMQ_REQ); // This one can be used to send command to the proxy |
297 | assert (control_req); |
298 | |
299 | // Connect CONTROL-REQ: a socket to which send commands |
300 | int rc = zmq_connect (control_req, cfg->control_endpoint); |
301 | assert (rc == 0); |
302 | |
303 | // Ask the proxy to exit: the subscriber has received all messages |
304 | |
305 | rc = zmq_send (control_req, "TERMINATE" , 9, 0); |
306 | assert (rc == 9); |
307 | |
308 | zmq_close (control_req); |
309 | } |
310 | |
311 | // The main thread simply starts some publishers, a proxy, |
312 | // and a subscriber. Finish when all packets are received. |
313 | |
314 | int main (int argc, char *argv[]) |
315 | { |
316 | if (argc != 3) { |
317 | printf ("usage: proxy_thr <message-size> <message-count>\n" ); |
318 | return 1; |
319 | } |
320 | |
321 | message_size = atoi (argv[1]); |
322 | message_count = atoi (argv[2]); |
323 | printf ("message size: %d [B]\n" , (int) message_size); |
324 | printf ("message count: %d\n" , (int) message_count); |
325 | |
326 | void *context = zmq_ctx_new (); |
327 | assert (context); |
328 | |
329 | int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4); |
330 | assert (rv == 0); |
331 | |
332 | // START ALL SECONDARY THREADS |
333 | |
334 | const char *pub1 = "inproc://perf_pub1" ; |
335 | const char *pub2 = "inproc://perf_pub2" ; |
336 | const char *sub1 = "inproc://perf_backend" ; |
337 | |
338 | proxy_hwm_cfg_t cfg_global = {}; |
339 | cfg_global.context = context; |
340 | cfg_global.frontend_endpoint[0] = pub1; |
341 | cfg_global.frontend_endpoint[1] = pub2; |
342 | cfg_global.backend_endpoint[0] = sub1; |
343 | cfg_global.control_endpoint = "inproc://ctrl" ; |
344 | |
345 | // Proxy |
346 | proxy_hwm_cfg_t cfg_proxy = cfg_global; |
347 | void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy); |
348 | assert (proxy != 0); |
349 | |
350 | // Subscriber 1 |
351 | proxy_hwm_cfg_t cfg_sub1 = cfg_global; |
352 | cfg_sub1.thread_idx = 0; |
353 | void *subscriber = |
354 | zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1); |
355 | assert (subscriber != 0); |
356 | |
357 | // Start measuring |
358 | void *watch = zmq_stopwatch_start (); |
359 | |
360 | // Publisher 1 |
361 | proxy_hwm_cfg_t cfg_pub1 = cfg_global; |
362 | cfg_pub1.thread_idx = 0; |
363 | void *publisher1 = |
364 | zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1); |
365 | assert (publisher1 != 0); |
366 | |
367 | // Publisher 2 |
368 | proxy_hwm_cfg_t cfg_pub2 = cfg_global; |
369 | cfg_pub2.thread_idx = 1; |
370 | void *publisher2 = |
371 | zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2); |
372 | assert (publisher2 != 0); |
373 | |
374 | // Wait for all packets to be received |
375 | zmq_threadclose (subscriber); |
376 | |
377 | // Stop measuring |
378 | unsigned long elapsed = zmq_stopwatch_stop (watch); |
379 | if (elapsed == 0) |
380 | elapsed = 1; |
381 | |
382 | unsigned long throughput = |
383 | (unsigned long) ((double) message_count / (double) elapsed * 1000000); |
384 | double megabits = (double) (throughput * message_size * 8) / 1000000; |
385 | |
386 | printf ("mean throughput: %d [msg/s]\n" , (int) throughput); |
387 | printf ("mean throughput: %.3f [Mb/s]\n" , (double) megabits); |
388 | |
389 | // Wait for the end of publishers... |
390 | zmq_threadclose (publisher1); |
391 | zmq_threadclose (publisher2); |
392 | |
393 | // ... then close the proxy |
394 | terminate_proxy (&cfg_proxy); |
395 | zmq_threadclose (proxy); |
396 | |
397 | int rc = zmq_ctx_term (context); |
398 | assert (rc == 0); |
399 | |
400 | return 0; |
401 | } |
402 | |