1/*
2 Copyright (c) 2007-2012 iMatix Corporation
3 Copyright (c) 2009-2011 250bpm s.r.o.
4 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
5
6 This file is part of libzmq, the ZeroMQ core engine in C++.
7
8 libzmq is free software; you can redistribute it and/or modify it under
9 the terms of the GNU Lesser General Public License (LGPL) as published
10 by the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
12
13 As a special exception, the Contributors give you permission to link
14 this library with independent modules to produce an executable,
15 regardless of the license terms of these independent modules, and to
16 copy and distribute the resulting executable under terms of your choice,
17 provided that you also meet, for each linked independent module, the
18 terms and conditions of the license of that module. An independent
19 module is a module which is not derived from or based on this library.
20 If you modify this library, you must extend this exception to your
21 version of the library.
22
23 libzmq is distributed in the hope that it will be useful, but WITHOUT
24 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
25 FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
26 License for more details.
27
28 You should have received a copy of the GNU Lesser General Public License
29 along with this program. If not, see <http://www.gnu.org/licenses/>.
30*/
31
32#include "../include/zmq.h"
33
34#include <stdio.h>
35#include <stdlib.h>
36#include <string.h>
37
38#include "platform.hpp"
39
40#if defined ZMQ_HAVE_WINDOWS
41#include <windows.h>
42#include <process.h>
43#else
44#include <pthread.h>
45#endif
46
47static int message_count;
48static size_t message_size;
49
50#if defined ZMQ_HAVE_WINDOWS
51static unsigned int __stdcall worker (void *ctx_)
52#else
53static void *worker (void *ctx_)
54#endif
55{
56 void *s;
57 int rc;
58 int i;
59 zmq_msg_t msg;
60
61 s = zmq_socket (ctx_, ZMQ_PUSH);
62 if (!s) {
63 printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
64 exit (1);
65 }
66
67 rc = zmq_connect (s, "inproc://thr_test");
68 if (rc != 0) {
69 printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
70 exit (1);
71 }
72
73 for (i = 0; i != message_count; i++) {
74 rc = zmq_msg_init_size (&msg, message_size);
75 if (rc != 0) {
76 printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno));
77 exit (1);
78 }
79#if defined ZMQ_MAKE_VALGRIND_HAPPY
80 memset (zmq_msg_data (&msg), 0, message_size);
81#endif
82
83 rc = zmq_sendmsg (s, &msg, 0);
84 if (rc < 0) {
85 printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
86 exit (1);
87 }
88 rc = zmq_msg_close (&msg);
89 if (rc != 0) {
90 printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
91 exit (1);
92 }
93 }
94
95 rc = zmq_close (s);
96 if (rc != 0) {
97 printf ("error in zmq_close: %s\n", zmq_strerror (errno));
98 exit (1);
99 }
100
101#if defined ZMQ_HAVE_WINDOWS
102 return 0;
103#else
104 return NULL;
105#endif
106}
107
108int main (int argc, char *argv[])
109{
110#if defined ZMQ_HAVE_WINDOWS
111 HANDLE local_thread;
112#else
113 pthread_t local_thread;
114#endif
115 void *ctx;
116 void *s;
117 int rc;
118 int i;
119 zmq_msg_t msg;
120 void *watch;
121 unsigned long elapsed;
122 unsigned long throughput;
123 double megabits;
124
125 if (argc != 3) {
126 printf ("usage: inproc_thr <message-size> <message-count>\n");
127 return 1;
128 }
129
130 message_size = atoi (argv[1]);
131 message_count = atoi (argv[2]);
132
133 ctx = zmq_init (1);
134 if (!ctx) {
135 printf ("error in zmq_init: %s\n", zmq_strerror (errno));
136 return -1;
137 }
138
139 s = zmq_socket (ctx, ZMQ_PULL);
140 if (!s) {
141 printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
142 return -1;
143 }
144
145 rc = zmq_bind (s, "inproc://thr_test");
146 if (rc != 0) {
147 printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
148 return -1;
149 }
150
151#if defined ZMQ_HAVE_WINDOWS
152 local_thread = (HANDLE) _beginthreadex (NULL, 0, worker, ctx, 0, NULL);
153 if (local_thread == 0) {
154 printf ("error in _beginthreadex\n");
155 return -1;
156 }
157#else
158 rc = pthread_create (&local_thread, NULL, worker, ctx);
159 if (rc != 0) {
160 printf ("error in pthread_create: %s\n", zmq_strerror (rc));
161 return -1;
162 }
163#endif
164
165 rc = zmq_msg_init (&msg);
166 if (rc != 0) {
167 printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
168 return -1;
169 }
170
171 printf ("message size: %d [B]\n", (int) message_size);
172 printf ("message count: %d\n", (int) message_count);
173
174 rc = zmq_recvmsg (s, &msg, 0);
175 if (rc < 0) {
176 printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
177 return -1;
178 }
179 if (zmq_msg_size (&msg) != message_size) {
180 printf ("message of incorrect size received\n");
181 return -1;
182 }
183
184 watch = zmq_stopwatch_start ();
185
186 for (i = 0; i != message_count - 1; i++) {
187 rc = zmq_recvmsg (s, &msg, 0);
188 if (rc < 0) {
189 printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
190 return -1;
191 }
192 if (zmq_msg_size (&msg) != message_size) {
193 printf ("message of incorrect size received\n");
194 return -1;
195 }
196 }
197
198 elapsed = zmq_stopwatch_stop (watch);
199 if (elapsed == 0)
200 elapsed = 1;
201
202 rc = zmq_msg_close (&msg);
203 if (rc != 0) {
204 printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
205 return -1;
206 }
207
208#if defined ZMQ_HAVE_WINDOWS
209 DWORD rc2 = WaitForSingleObject (local_thread, INFINITE);
210 if (rc2 == WAIT_FAILED) {
211 printf ("error in WaitForSingleObject\n");
212 return -1;
213 }
214 BOOL rc3 = CloseHandle (local_thread);
215 if (rc3 == 0) {
216 printf ("error in CloseHandle\n");
217 return -1;
218 }
219#else
220 rc = pthread_join (local_thread, NULL);
221 if (rc != 0) {
222 printf ("error in pthread_join: %s\n", zmq_strerror (rc));
223 return -1;
224 }
225#endif
226
227 rc = zmq_close (s);
228 if (rc != 0) {
229 printf ("error in zmq_close: %s\n", zmq_strerror (errno));
230 return -1;
231 }
232
233 rc = zmq_ctx_term (ctx);
234 if (rc != 0) {
235 printf ("error in zmq_ctx_term: %s\n", zmq_strerror (errno));
236 return -1;
237 }
238
239 throughput =
240 (unsigned long) ((double) message_count / (double) elapsed * 1000000);
241 megabits = (double) (throughput * message_size * 8) / 1000000;
242
243 printf ("mean throughput: %d [msg/s]\n", (int) throughput);
244 printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);
245
246 return 0;
247}
248