1 | /* |
2 | Copyright (c) 2007-2012 iMatix Corporation |
3 | Copyright (c) 2009-2011 250bpm s.r.o. |
4 | Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file |
5 | |
6 | This file is part of libzmq, the ZeroMQ core engine in C++. |
7 | |
8 | libzmq is free software; you can redistribute it and/or modify it under |
9 | the terms of the GNU Lesser General Public License (LGPL) as published |
10 | by the Free Software Foundation; either version 3 of the License, or |
11 | (at your option) any later version. |
12 | |
13 | As a special exception, the Contributors give you permission to link |
14 | this library with independent modules to produce an executable, |
15 | regardless of the license terms of these independent modules, and to |
16 | copy and distribute the resulting executable under terms of your choice, |
17 | provided that you also meet, for each linked independent module, the |
18 | terms and conditions of the license of that module. An independent |
19 | module is a module which is not derived from or based on this library. |
20 | If you modify this library, you must extend this exception to your |
21 | version of the library. |
22 | |
23 | libzmq is distributed in the hope that it will be useful, but WITHOUT |
24 | ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or |
25 | FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public |
26 | License for more details. |
27 | |
28 | You should have received a copy of the GNU Lesser General Public License |
29 | along with this program. If not, see <http://www.gnu.org/licenses/>. |
30 | */ |
31 | |
32 | #include "../include/zmq.h" |
33 | |
34 | #include <stdio.h> |
35 | #include <stdlib.h> |
36 | #include <string.h> |
37 | |
38 | #include "platform.hpp" |
39 | |
40 | #if defined ZMQ_HAVE_WINDOWS |
41 | #include <windows.h> |
42 | #include <process.h> |
43 | #else |
44 | #include <pthread.h> |
45 | #endif |
46 | |
47 | static int message_count; |
48 | static size_t message_size; |
49 | |
50 | #if defined ZMQ_HAVE_WINDOWS |
51 | static unsigned int __stdcall worker (void *ctx_) |
52 | #else |
53 | static void *worker (void *ctx_) |
54 | #endif |
55 | { |
56 | void *s; |
57 | int rc; |
58 | int i; |
59 | zmq_msg_t msg; |
60 | |
61 | s = zmq_socket (ctx_, ZMQ_PUSH); |
62 | if (!s) { |
63 | printf ("error in zmq_socket: %s\n" , zmq_strerror (errno)); |
64 | exit (1); |
65 | } |
66 | |
67 | rc = zmq_connect (s, "inproc://thr_test" ); |
68 | if (rc != 0) { |
69 | printf ("error in zmq_connect: %s\n" , zmq_strerror (errno)); |
70 | exit (1); |
71 | } |
72 | |
73 | for (i = 0; i != message_count; i++) { |
74 | rc = zmq_msg_init_size (&msg, message_size); |
75 | if (rc != 0) { |
76 | printf ("error in zmq_msg_init_size: %s\n" , zmq_strerror (errno)); |
77 | exit (1); |
78 | } |
79 | #if defined ZMQ_MAKE_VALGRIND_HAPPY |
80 | memset (zmq_msg_data (&msg), 0, message_size); |
81 | #endif |
82 | |
83 | rc = zmq_sendmsg (s, &msg, 0); |
84 | if (rc < 0) { |
85 | printf ("error in zmq_sendmsg: %s\n" , zmq_strerror (errno)); |
86 | exit (1); |
87 | } |
88 | rc = zmq_msg_close (&msg); |
89 | if (rc != 0) { |
90 | printf ("error in zmq_msg_close: %s\n" , zmq_strerror (errno)); |
91 | exit (1); |
92 | } |
93 | } |
94 | |
95 | rc = zmq_close (s); |
96 | if (rc != 0) { |
97 | printf ("error in zmq_close: %s\n" , zmq_strerror (errno)); |
98 | exit (1); |
99 | } |
100 | |
101 | #if defined ZMQ_HAVE_WINDOWS |
102 | return 0; |
103 | #else |
104 | return NULL; |
105 | #endif |
106 | } |
107 | |
108 | int main (int argc, char *argv[]) |
109 | { |
110 | #if defined ZMQ_HAVE_WINDOWS |
111 | HANDLE local_thread; |
112 | #else |
113 | pthread_t local_thread; |
114 | #endif |
115 | void *ctx; |
116 | void *s; |
117 | int rc; |
118 | int i; |
119 | zmq_msg_t msg; |
120 | void *watch; |
121 | unsigned long elapsed; |
122 | unsigned long throughput; |
123 | double megabits; |
124 | |
125 | if (argc != 3) { |
126 | printf ("usage: inproc_thr <message-size> <message-count>\n" ); |
127 | return 1; |
128 | } |
129 | |
130 | message_size = atoi (argv[1]); |
131 | message_count = atoi (argv[2]); |
132 | |
133 | ctx = zmq_init (1); |
134 | if (!ctx) { |
135 | printf ("error in zmq_init: %s\n" , zmq_strerror (errno)); |
136 | return -1; |
137 | } |
138 | |
139 | s = zmq_socket (ctx, ZMQ_PULL); |
140 | if (!s) { |
141 | printf ("error in zmq_socket: %s\n" , zmq_strerror (errno)); |
142 | return -1; |
143 | } |
144 | |
145 | rc = zmq_bind (s, "inproc://thr_test" ); |
146 | if (rc != 0) { |
147 | printf ("error in zmq_bind: %s\n" , zmq_strerror (errno)); |
148 | return -1; |
149 | } |
150 | |
151 | #if defined ZMQ_HAVE_WINDOWS |
152 | local_thread = (HANDLE) _beginthreadex (NULL, 0, worker, ctx, 0, NULL); |
153 | if (local_thread == 0) { |
154 | printf ("error in _beginthreadex\n" ); |
155 | return -1; |
156 | } |
157 | #else |
158 | rc = pthread_create (&local_thread, NULL, worker, ctx); |
159 | if (rc != 0) { |
160 | printf ("error in pthread_create: %s\n" , zmq_strerror (rc)); |
161 | return -1; |
162 | } |
163 | #endif |
164 | |
165 | rc = zmq_msg_init (&msg); |
166 | if (rc != 0) { |
167 | printf ("error in zmq_msg_init: %s\n" , zmq_strerror (errno)); |
168 | return -1; |
169 | } |
170 | |
171 | printf ("message size: %d [B]\n" , (int) message_size); |
172 | printf ("message count: %d\n" , (int) message_count); |
173 | |
174 | rc = zmq_recvmsg (s, &msg, 0); |
175 | if (rc < 0) { |
176 | printf ("error in zmq_recvmsg: %s\n" , zmq_strerror (errno)); |
177 | return -1; |
178 | } |
179 | if (zmq_msg_size (&msg) != message_size) { |
180 | printf ("message of incorrect size received\n" ); |
181 | return -1; |
182 | } |
183 | |
184 | watch = zmq_stopwatch_start (); |
185 | |
186 | for (i = 0; i != message_count - 1; i++) { |
187 | rc = zmq_recvmsg (s, &msg, 0); |
188 | if (rc < 0) { |
189 | printf ("error in zmq_recvmsg: %s\n" , zmq_strerror (errno)); |
190 | return -1; |
191 | } |
192 | if (zmq_msg_size (&msg) != message_size) { |
193 | printf ("message of incorrect size received\n" ); |
194 | return -1; |
195 | } |
196 | } |
197 | |
198 | elapsed = zmq_stopwatch_stop (watch); |
199 | if (elapsed == 0) |
200 | elapsed = 1; |
201 | |
202 | rc = zmq_msg_close (&msg); |
203 | if (rc != 0) { |
204 | printf ("error in zmq_msg_close: %s\n" , zmq_strerror (errno)); |
205 | return -1; |
206 | } |
207 | |
208 | #if defined ZMQ_HAVE_WINDOWS |
209 | DWORD rc2 = WaitForSingleObject (local_thread, INFINITE); |
210 | if (rc2 == WAIT_FAILED) { |
211 | printf ("error in WaitForSingleObject\n" ); |
212 | return -1; |
213 | } |
214 | BOOL rc3 = CloseHandle (local_thread); |
215 | if (rc3 == 0) { |
216 | printf ("error in CloseHandle\n" ); |
217 | return -1; |
218 | } |
219 | #else |
220 | rc = pthread_join (local_thread, NULL); |
221 | if (rc != 0) { |
222 | printf ("error in pthread_join: %s\n" , zmq_strerror (rc)); |
223 | return -1; |
224 | } |
225 | #endif |
226 | |
227 | rc = zmq_close (s); |
228 | if (rc != 0) { |
229 | printf ("error in zmq_close: %s\n" , zmq_strerror (errno)); |
230 | return -1; |
231 | } |
232 | |
233 | rc = zmq_ctx_term (ctx); |
234 | if (rc != 0) { |
235 | printf ("error in zmq_ctx_term: %s\n" , zmq_strerror (errno)); |
236 | return -1; |
237 | } |
238 | |
239 | throughput = |
240 | (unsigned long) ((double) message_count / (double) elapsed * 1000000); |
241 | megabits = (double) (throughput * message_size * 8) / 1000000; |
242 | |
243 | printf ("mean throughput: %d [msg/s]\n" , (int) throughput); |
244 | printf ("mean throughput: %.3f [Mb/s]\n" , (double) megabits); |
245 | |
246 | return 0; |
247 | } |
248 | |