1/*
2 Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
3
4 This file is part of libzmq, the ZeroMQ core engine in C++.
5
6 libzmq is free software; you can redistribute it and/or modify it under
7 the terms of the GNU Lesser General Public License (LGPL) as published
8 by the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
10
11 As a special exception, the Contributors give you permission to link
12 this library with independent modules to produce an executable,
13 regardless of the license terms of these independent modules, and to
14 copy and distribute the resulting executable under terms of your choice,
15 provided that you also meet, for each linked independent module, the
16 terms and conditions of the license of that module. An independent
17 module is a module which is not derived from or based on this library.
18 If you modify this library, you must extend this exception to your
19 version of the library.
20
21 libzmq is distributed in the hope that it will be useful, but WITHOUT
22 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
23 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
24 License for more details.
25
26 You should have received a copy of the GNU Lesser General Public License
27 along with this program. If not, see <http://www.gnu.org/licenses/>.
28*/
29
30#include "../include/zmq.h"
31#include <stdio.h>
32#include <stdlib.h>
33
34// keys are arbitrary but must match remote_lat.cpp
35const char server_prvkey[] = "{X}#>t#jRGaQ}gMhv=30r(Mw+87YGs+5%kh=i@f8";
36
37int main (int argc, char *argv[])
38{
39 const char *bind_to;
40 int message_count;
41 size_t message_size;
42 void *ctx;
43 void *s;
44 int rc;
45 int i;
46 zmq_msg_t msg;
47 void *watch;
48 unsigned long elapsed;
49 double throughput;
50 double megabits;
51 int curve = 0;
52
53 if (argc != 4 && argc != 5) {
54 printf ("usage: local_thr <bind-to> <message-size> <message-count> "
55 "[<enable_curve>]\n");
56 return 1;
57 }
58 bind_to = argv[1];
59 message_size = atoi (argv[2]);
60 message_count = atoi (argv[3]);
61 if (argc >= 5 && atoi (argv[4])) {
62 curve = 1;
63 }
64
65 ctx = zmq_init (1);
66 if (!ctx) {
67 printf ("error in zmq_init: %s\n", zmq_strerror (errno));
68 return -1;
69 }
70
71 s = zmq_socket (ctx, ZMQ_PULL);
72 if (!s) {
73 printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
74 return -1;
75 }
76
77 // Add your socket options here.
78 // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
79 if (curve) {
80 rc = zmq_setsockopt (s, ZMQ_CURVE_SECRETKEY, server_prvkey,
81 sizeof (server_prvkey));
82 if (rc != 0) {
83 printf ("error in zmq_setsockoopt: %s\n", zmq_strerror (errno));
84 return -1;
85 }
86 int server = 1;
87 rc = zmq_setsockopt (s, ZMQ_CURVE_SERVER, &server, sizeof (int));
88 if (rc != 0) {
89 printf ("error in zmq_setsockoopt: %s\n", zmq_strerror (errno));
90 return -1;
91 }
92 }
93
94 rc = zmq_bind (s, bind_to);
95 if (rc != 0) {
96 printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
97 return -1;
98 }
99
100 rc = zmq_msg_init (&msg);
101 if (rc != 0) {
102 printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
103 return -1;
104 }
105
106 rc = zmq_recvmsg (s, &msg, 0);
107 if (rc < 0) {
108 printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
109 return -1;
110 }
111 if (zmq_msg_size (&msg) != message_size) {
112 printf ("message of incorrect size received\n");
113 return -1;
114 }
115
116 watch = zmq_stopwatch_start ();
117
118 for (i = 0; i != message_count - 1; i++) {
119 rc = zmq_recvmsg (s, &msg, 0);
120 if (rc < 0) {
121 printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
122 return -1;
123 }
124 if (zmq_msg_size (&msg) != message_size) {
125 printf ("message of incorrect size received\n");
126 return -1;
127 }
128 }
129
130 elapsed = zmq_stopwatch_stop (watch);
131 if (elapsed == 0)
132 elapsed = 1;
133
134 rc = zmq_msg_close (&msg);
135 if (rc != 0) {
136 printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
137 return -1;
138 }
139
140 throughput = ((double) message_count / (double) elapsed * 1000000);
141 megabits = ((double) throughput * message_size * 8) / 1000000;
142
143 printf ("message size: %d [B]\n", (int) message_size);
144 printf ("message count: %d\n", (int) message_count);
145 printf ("mean throughput: %d [msg/s]\n", (int) throughput);
146 printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
147
148 rc = zmq_close (s);
149 if (rc != 0) {
150 printf ("error in zmq_close: %s\n", zmq_strerror (errno));
151 return -1;
152 }
153
154 rc = zmq_ctx_term (ctx);
155 if (rc != 0) {
156 printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
157 return -1;
158 }
159
160 return 0;
161}
162