1/*
2 Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "testutil.hpp"
31#include "testutil_unity.hpp"
32
33#include <stdlib.h>
34#include <string.h>
35
36SETUP_TEARDOWN_TESTCONTEXT
37
38#define CONTENT_SIZE 13
39#define CONTENT_SIZE_MAX 32
40#define ROUTING_ID_SIZE 10
41#define ROUTING_ID_SIZE_MAX 32
42#define QT_WORKERS 5
43#define QT_CLIENTS 3
44#define is_verbose 0
45
46struct thread_data
47{
48 int id;
49};
50
51typedef struct
52{
53 uint64_t msg_in;
54 uint64_t bytes_in;
55 uint64_t msg_out;
56 uint64_t bytes_out;
57} zmq_socket_stats_t;
58
59typedef struct
60{
61 zmq_socket_stats_t frontend;
62 zmq_socket_stats_t backend;
63} zmq_proxy_stats_t;
64
65void *g_clients_pkts_out = NULL;
66void *g_workers_pkts_out = NULL;
67
68// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
69//
70// While this example runs in a single process, that is to make
71// it easier to start and stop the example. Each task may have its own
72// context and conceptually acts as a separate process. To have this
73// behaviour, it is necessary to replace the inproc transport of the
74// control socket by a tcp transport.
75
76// This is our client task
77// It connects to the server, and then sends a request once per second
78// It collects responses as they arrive, and it prints them out. We will
79// run several client tasks in parallel, each with a different random ID.
80
81static void client_task (void *db_)
82{
83 struct thread_data *databag = static_cast<struct thread_data *> (db_);
84 // Endpoint socket gets random port to avoid test failing when port in use
85 void *endpoint = zmq_socket (get_test_context (), ZMQ_PAIR);
86 TEST_ASSERT_NOT_NULL (endpoint);
87 int linger = 0;
88 TEST_ASSERT_SUCCESS_ERRNO (
89 zmq_setsockopt (endpoint, ZMQ_LINGER, &linger, sizeof (linger)));
90 char endpoint_source[256];
91 sprintf (endpoint_source, "inproc://endpoint%d", databag->id);
92 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (endpoint, endpoint_source));
93 char *my_endpoint = s_recv (endpoint);
94 TEST_ASSERT_NOT_NULL (my_endpoint);
95
96 void *client = zmq_socket (get_test_context (), ZMQ_DEALER);
97 TEST_ASSERT_NOT_NULL (client);
98
99 // Control socket receives terminate command from main over inproc
100 void *control = zmq_socket (get_test_context (), ZMQ_SUB);
101 TEST_ASSERT_NOT_NULL (control);
102 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
103 TEST_ASSERT_SUCCESS_ERRNO (
104 zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
105 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
106
107 char content[CONTENT_SIZE_MAX] = {};
108 // Set random routing id to make tracing easier
109 char routing_id[ROUTING_ID_SIZE] = {};
110 sprintf (routing_id, "%04X-%04X", rand () % 0xFFFF, rand () % 0xFFFF);
111 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
112 client, ZMQ_ROUTING_ID, routing_id,
113 ROUTING_ID_SIZE)); // includes '\0' as an helper for printf
114 linger = 0;
115 TEST_ASSERT_SUCCESS_ERRNO (
116 zmq_setsockopt (client, ZMQ_LINGER, &linger, sizeof (linger)));
117 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint));
118
119 zmq_pollitem_t items[] = {{client, 0, ZMQ_POLLIN, 0},
120 {control, 0, ZMQ_POLLIN, 0}};
121 int request_nbr = 0;
122 bool run = true;
123 bool keep_sending = true;
124 while (run) {
125 // Tick once per 200 ms, pulling in arriving messages
126 int centitick;
127 for (centitick = 0; centitick < 20; centitick++) {
128 zmq_poll (items, 2, 10);
129 if (items[0].revents & ZMQ_POLLIN) {
130 int rcvmore;
131 size_t sz = sizeof (rcvmore);
132 int rc = TEST_ASSERT_SUCCESS_ERRNO (
133 zmq_recv (client, content, CONTENT_SIZE_MAX, 0));
134 TEST_ASSERT_EQUAL_INT (CONTENT_SIZE, rc);
135 if (is_verbose)
136 printf (
137 "client receive - routing_id = %s content = %s\n",
138 routing_id, content);
139 // Check that message is still the same
140 TEST_ASSERT_EQUAL_STRING_LEN ("request #", content, 9);
141 TEST_ASSERT_SUCCESS_ERRNO (
142 zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz));
143 TEST_ASSERT_FALSE (rcvmore);
144 }
145 if (items[1].revents & ZMQ_POLLIN) {
146 int rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
147
148 if (rc > 0) {
149 content[rc] = 0; // NULL-terminate the command string
150 if (is_verbose)
151 printf (
152 "client receive - routing_id = %s command = %s\n",
153 routing_id, content);
154 if (memcmp (content, "TERMINATE", 9) == 0) {
155 run = false;
156 break;
157 }
158 if (memcmp (content, "STOP", 4) == 0) {
159 keep_sending = false;
160 break;
161 }
162 }
163 }
164 }
165
166 if (keep_sending) {
167 sprintf (content, "request #%03d", ++request_nbr); // CONTENT_SIZE
168 if (is_verbose)
169 printf ("client send - routing_id = %s request #%03d\n",
170 routing_id, request_nbr);
171 zmq_atomic_counter_inc (g_clients_pkts_out);
172
173 TEST_ASSERT_EQUAL_INT (CONTENT_SIZE,
174 zmq_send (client, content, CONTENT_SIZE, 0));
175 }
176 }
177
178 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (client));
179 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
180 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint));
181 free (my_endpoint);
182}
183
184// This is our server task.
185// It uses the multithreaded server model to deal requests out to a pool
186// of workers and route replies back to clients. One worker can handle
187// one request at a time but one client can talk to multiple workers at
188// once.
189
190static void server_worker (void * /*unused_*/);
191
192void server_task (void * /*unused_*/)
193{
194 // Frontend socket talks to clients over TCP
195 char my_endpoint[MAX_SOCKET_STRING];
196 void *frontend = zmq_socket (get_test_context (), ZMQ_ROUTER);
197 TEST_ASSERT_NOT_NULL (frontend);
198 int linger = 0;
199 TEST_ASSERT_SUCCESS_ERRNO (
200 zmq_setsockopt (frontend, ZMQ_LINGER, &linger, sizeof (linger)));
201 bind_loopback_ipv4 (frontend, my_endpoint, sizeof my_endpoint);
202
203 // Backend socket talks to workers over inproc
204 void *backend = zmq_socket (get_test_context (), ZMQ_DEALER);
205 TEST_ASSERT_NOT_NULL (backend);
206 TEST_ASSERT_SUCCESS_ERRNO (
207 zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger)));
208 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "inproc://backend"));
209
210 // Control socket receives terminate command from main over inproc
211 void *control = zmq_socket (get_test_context (), ZMQ_REP);
212 TEST_ASSERT_NOT_NULL (control);
213 TEST_ASSERT_SUCCESS_ERRNO (
214 zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
215 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control_proxy"));
216
217 // Launch pool of worker threads, precise number is not critical
218 int thread_nbr;
219 void *threads[5];
220 for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
221 threads[thread_nbr] = zmq_threadstart (&server_worker, NULL);
222
223 // Endpoint socket sends random port to avoid test failing when port in use
224 void *endpoint_receivers[QT_CLIENTS];
225 char endpoint_source[256];
226 for (int i = 0; i < QT_CLIENTS; ++i) {
227 endpoint_receivers[i] = zmq_socket (get_test_context (), ZMQ_PAIR);
228 TEST_ASSERT_NOT_NULL (endpoint_receivers[i]);
229 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
230 endpoint_receivers[i], ZMQ_LINGER, &linger, sizeof (linger)));
231 sprintf (endpoint_source, "inproc://endpoint%d", i);
232 TEST_ASSERT_SUCCESS_ERRNO (
233 zmq_bind (endpoint_receivers[i], endpoint_source));
234 }
235
236 for (int i = 0; i < QT_CLIENTS; ++i) {
237 send_string_expect_success (endpoint_receivers[i], my_endpoint, 0);
238 }
239
240 // Connect backend to frontend via a proxy
241 TEST_ASSERT_SUCCESS_ERRNO (
242 zmq_proxy_steerable (frontend, backend, NULL, control));
243
244 for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
245 zmq_threadclose (threads[thread_nbr]);
246
247 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (frontend));
248 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (backend));
249 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
250 for (int i = 0; i < QT_CLIENTS; ++i) {
251 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint_receivers[i]));
252 }
253}
254
255// Each worker task works on one request at a time and sends a random number
256// of replies back, with random delays between replies:
257// The comments in the first column, if suppressed, makes it a poller version
258
259static void server_worker (void * /*unused_*/)
260{
261 void *worker = zmq_socket (get_test_context (), ZMQ_DEALER);
262 TEST_ASSERT_NOT_NULL (worker);
263 int linger = 0;
264 TEST_ASSERT_SUCCESS_ERRNO (
265 zmq_setsockopt (worker, ZMQ_LINGER, &linger, sizeof (linger)));
266 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (worker, "inproc://backend"));
267
268 // Control socket receives terminate command from main over inproc
269 void *control = zmq_socket (get_test_context (), ZMQ_SUB);
270 TEST_ASSERT_NOT_NULL (control);
271 TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
272 TEST_ASSERT_SUCCESS_ERRNO (
273 zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
274 TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control"));
275
276 char content[CONTENT_SIZE_MAX] =
277 {}; // bigger than what we need to check that
278 char routing_id[ROUTING_ID_SIZE_MAX] =
279 {}; // the size received is the size sent
280
281 bool run = true;
282 bool keep_sending = true;
283 while (run) {
284 int rc = zmq_recv (control, content, CONTENT_SIZE_MAX,
285 ZMQ_DONTWAIT); // usually, rc == -1 (no message)
286 if (rc > 0) {
287 content[rc] = 0; // NULL-terminate the command string
288 if (is_verbose)
289 printf ("server_worker receives command = %s\n", content);
290 if (memcmp (content, "TERMINATE", 9) == 0)
291 run = false;
292 if (memcmp (content, "STOP", 4) == 0)
293 keep_sending = false;
294 }
295 // The DEALER socket gives us the reply envelope and message
296 // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
297 rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, ZMQ_DONTWAIT);
298 if (rc == ROUTING_ID_SIZE) {
299 rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
300 TEST_ASSERT_EQUAL_INT (CONTENT_SIZE, rc);
301 if (is_verbose)
302 printf ("server receive - routing_id = %s content = %s\n",
303 routing_id, content);
304
305 // Send 0..4 replies back
306 if (keep_sending) {
307 int reply, replies = rand () % 5;
308 for (reply = 0; reply < replies; reply++) {
309 // Sleep for some fraction of a second
310 msleep (rand () % 10 + 1);
311
312 // Send message from server to client
313 if (is_verbose)
314 printf ("server send - routing_id = %s reply\n",
315 routing_id);
316 zmq_atomic_counter_inc (g_workers_pkts_out);
317
318 rc = zmq_send (worker, routing_id, ROUTING_ID_SIZE,
319 ZMQ_SNDMORE);
320 TEST_ASSERT_EQUAL_INT (ROUTING_ID_SIZE, rc);
321 rc = zmq_send (worker, content, CONTENT_SIZE, 0);
322 TEST_ASSERT_EQUAL_INT (CONTENT_SIZE, rc);
323 }
324 }
325 }
326 }
327 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (worker));
328 TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
329}
330
331uint64_t recv_stat (void *sock_, bool last_)
332{
333 uint64_t res;
334 zmq_msg_t stats_msg;
335
336 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
337 TEST_ASSERT_EQUAL_INT (sizeof (uint64_t),
338 zmq_recvmsg (sock_, &stats_msg, 0));
339 memcpy (&res, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg));
340 TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
341
342 int more;
343 size_t moresz = sizeof more;
344 TEST_ASSERT_SUCCESS_ERRNO (
345 zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz));
346 TEST_ASSERT_TRUE ((last_ && !more) || (!last_ && more));
347
348 return res;
349}
350
351// Utility function to interrogate the proxy:
352
353void check_proxy_stats (void *control_proxy_)
354{
355 zmq_proxy_stats_t total_stats;
356
357 send_string_expect_success (control_proxy_, "STATISTICS", 0);
358
359 // first frame of the reply contains FRONTEND stats:
360 total_stats.frontend.msg_in = recv_stat (control_proxy_, false);
361 total_stats.frontend.bytes_in = recv_stat (control_proxy_, false);
362 total_stats.frontend.msg_out = recv_stat (control_proxy_, false);
363 total_stats.frontend.bytes_out = recv_stat (control_proxy_, false);
364
365 // second frame of the reply contains BACKEND stats:
366 total_stats.backend.msg_in = recv_stat (control_proxy_, false);
367 total_stats.backend.bytes_in = recv_stat (control_proxy_, false);
368 total_stats.backend.msg_out = recv_stat (control_proxy_, false);
369 total_stats.backend.bytes_out = recv_stat (control_proxy_, true);
370
371 // check stats
372
373 if (is_verbose) {
374 printf (
375 "frontend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n",
376 static_cast<unsigned long int> (total_stats.frontend.msg_in),
377 static_cast<unsigned long int> (total_stats.frontend.bytes_in),
378 static_cast<unsigned long int> (total_stats.frontend.msg_out),
379 static_cast<unsigned long int> (total_stats.frontend.bytes_out));
380 printf (
381 "backend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n",
382 static_cast<unsigned long int> (total_stats.backend.msg_in),
383 static_cast<unsigned long int> (total_stats.backend.bytes_in),
384 static_cast<unsigned long int> (total_stats.backend.msg_out),
385 static_cast<unsigned long int> (total_stats.backend.bytes_out));
386
387 printf ("clients sent out %d requests\n",
388 zmq_atomic_counter_value (g_clients_pkts_out));
389 printf ("workers sent out %d replies\n",
390 zmq_atomic_counter_value (g_workers_pkts_out));
391 }
392 TEST_ASSERT_EQUAL_UINT (
393 (unsigned) zmq_atomic_counter_value (g_clients_pkts_out),
394 total_stats.frontend.msg_in);
395 TEST_ASSERT_EQUAL_UINT (
396 (unsigned) zmq_atomic_counter_value (g_workers_pkts_out),
397 total_stats.frontend.msg_out);
398 TEST_ASSERT_EQUAL_UINT (
399 (unsigned) zmq_atomic_counter_value (g_workers_pkts_out),
400 total_stats.backend.msg_in);
401 TEST_ASSERT_EQUAL_UINT (
402 (unsigned) zmq_atomic_counter_value (g_clients_pkts_out),
403 total_stats.backend.msg_out);
404}
405
406
407// The main thread simply starts several clients and a server, and then
408// waits for the server to finish.
409
410void test_proxy ()
411{
412 g_clients_pkts_out = zmq_atomic_counter_new ();
413 g_workers_pkts_out = zmq_atomic_counter_new ();
414
415 // Control socket receives terminate command from main over inproc
416 void *control = test_context_socket (ZMQ_PUB);
417 int linger = 0;
418 TEST_ASSERT_SUCCESS_ERRNO (
419 zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
420 TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (control, "inproc://control"));
421
422 // Control socket receives terminate command from main over inproc
423 void *control_proxy = test_context_socket (ZMQ_REQ);
424 TEST_ASSERT_SUCCESS_ERRNO (
425 zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger)));
426 TEST_ASSERT_SUCCESS_ERRNO (
427 zmq_bind (control_proxy, "inproc://control_proxy"));
428
429 void *threads[QT_CLIENTS + 1];
430 struct thread_data databags[QT_CLIENTS + 1];
431 for (int i = 0; i < QT_CLIENTS; i++) {
432 databags[i].id = i;
433 threads[i] = zmq_threadstart (&client_task, &databags[i]);
434 }
435 threads[QT_CLIENTS] = zmq_threadstart (&server_task, NULL);
436 msleep (500); // Run for 500 ms then quit
437
438 if (is_verbose)
439 printf ("stopping all clients and server workers\n");
440 send_string_expect_success (control, "STOP", 0);
441
442 msleep (500); // Wait for all clients and workers to STOP
443
444 if (is_verbose)
445 printf ("retrieving stats from the proxy\n");
446 check_proxy_stats (control_proxy);
447
448 if (is_verbose)
449 printf ("shutting down all clients and server workers\n");
450 send_string_expect_success (control, "TERMINATE", 0);
451
452 if (is_verbose)
453 printf ("shutting down the proxy\n");
454 send_string_expect_success (control_proxy, "TERMINATE", 0);
455
456 test_context_socket_close (control);
457 test_context_socket_close (control_proxy);
458
459 for (int i = 0; i < QT_CLIENTS + 1; i++)
460 zmq_threadclose (threads[i]);
461}
462
463int main (void)
464{
465 setup_test_environment ();
466
467 UNITY_BEGIN ();
468 RUN_TEST (test_proxy);
469 return UNITY_END ();
470}
471