1 | /* |
2 | Copyright 2011 Kristian Nielsen and Monty Program Ab. |
3 | |
4 | This file is free software; you can redistribute it and/or |
5 | modify it under the terms of the GNU Lesser General Public |
6 | License as published by the Free Software Foundation; either |
7 | version 2.1 of the License, or (at your option) any later version. |
8 | |
9 | This library is distributed in the hope that it will be useful, |
10 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
12 | Lesser General Public License for more details. |
13 | |
14 | You should have received a copy of the GNU General Public License |
15 | along with this. If not, see <http://www.gnu.org/licenses/>. |
16 | */ |
17 | |
18 | |
19 | /* |
20 | Run a set of queries in parallel against a server using the non-blocking |
21 | API, and compare to running same queries with the normal blocking API. |
22 | */ |
23 | |
24 | #include <my_global.h> |
25 | #include <my_sys.h> |
26 | #include <mysql.h> |
27 | #include <my_getopt.h> |
28 | |
29 | #include <sys/time.h> |
30 | #include <stdlib.h> |
31 | #include <stdio.h> |
32 | #include <string.h> |
33 | |
34 | #include <event.h> |
35 | |
36 | |
37 | #define SL(s) (s), sizeof(s) |
38 | static const char *my_groups[]= { "client" , NULL }; |
39 | |
40 | /* Maintaining a list of queries to run. */ |
41 | struct query_entry { |
42 | struct query_entry *next; |
43 | char *query; |
44 | int index; |
45 | }; |
46 | static struct query_entry *query_list; |
47 | static struct query_entry **tail_ptr= &query_list; |
48 | static int query_counter= 0; |
49 | |
50 | |
51 | /* State kept for each connection. */ |
52 | struct state_data { |
53 | int ST; /* State machine current state */ |
54 | struct event ev_mysql; |
55 | MYSQL mysql; |
56 | MYSQL_RES *result; |
57 | MYSQL *ret; |
58 | int err; |
59 | MYSQL_ROW row; |
60 | struct query_entry *query_element; |
61 | int index; |
62 | }; |
63 | |
64 | |
65 | static const char *opt_db= NULL; |
66 | static const char *opt_user= NULL; |
67 | static const char *opt_password= NULL; |
68 | static int tty_password= 0; |
69 | static const char *opt_host= NULL; |
70 | static const char *opt_socket= NULL; |
71 | static unsigned int opt_port= 0; |
72 | static unsigned int opt_connections= 5; |
73 | static const char *opt_query_file= NULL; |
74 | |
75 | static struct my_option options[] = |
76 | { |
77 | {"database" , 'D', "Database to use" , &opt_db, &opt_db, |
78 | 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
79 | {"help" , '?', "Display this help and exit" , 0, 0, 0, GET_NO_ARG, NO_ARG, 0, |
80 | 0, 0, 0, 0, 0}, |
81 | {"host" , 'h', "Connect to host" , &opt_host, &opt_host, |
82 | 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
83 | {"password" , 'p', |
84 | "Password to use when connecting to server. If password is not given it's asked from the tty." , |
85 | 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0}, |
86 | {"port" , 'P', "Port number to use for connection." , |
87 | &opt_port, &opt_port, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
88 | {"socket" , 'S', "Socket file to use for connection" , |
89 | &opt_socket, &opt_socket, 0, GET_STR, |
90 | REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
91 | {"user" , 'u', "User for login if not current user" , &opt_user, |
92 | &opt_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
93 | {"connections" , 'n', "Number of simultaneous connections/queries." , |
94 | &opt_connections, &opt_connections, 0, GET_UINT, REQUIRED_ARG, |
95 | 5, 0, 0, 0, 0, 0}, |
96 | {"queryfile" , 'q', "Name of file containing extra queries to run" , |
97 | &opt_query_file, &opt_query_file, 0, GET_STR, REQUIRED_ARG, |
98 | 0, 0, 0, 0, 0, 0}, |
99 | { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} |
100 | }; |
101 | |
102 | static void |
103 | fatal(struct state_data *sd, const char *msg) |
104 | { |
105 | fprintf(stderr, "%s: %s\n" , msg, (sd ? mysql_error(&sd->mysql) : "" )); |
106 | exit(1); |
107 | } |
108 | |
109 | |
110 | static void state_machine_handler(int fd, short event, void *arg); |
111 | |
112 | static void |
113 | next_event(int new_st, int status, struct state_data *sd) |
114 | { |
115 | short wait_event= 0; |
116 | struct timeval tv, *ptv; |
117 | int fd; |
118 | |
119 | if (status & MYSQL_WAIT_READ) |
120 | wait_event|= EV_READ; |
121 | if (status & MYSQL_WAIT_WRITE) |
122 | wait_event|= EV_WRITE; |
123 | if (wait_event) |
124 | fd= mysql_get_socket(&sd->mysql); |
125 | else |
126 | fd= -1; |
127 | if (status & MYSQL_WAIT_TIMEOUT) |
128 | { |
129 | tv.tv_sec= mysql_get_timeout_value(&sd->mysql); |
130 | tv.tv_usec= 0; |
131 | ptv= &tv; |
132 | } |
133 | else |
134 | ptv= NULL; |
135 | event_set(&sd->ev_mysql, fd, wait_event, state_machine_handler, sd); |
136 | event_add(&sd->ev_mysql, ptv); |
137 | sd->ST= new_st; |
138 | } |
139 | |
140 | static int |
141 | mysql_status(short event) |
142 | { |
143 | int status= 0; |
144 | if (event & EV_READ) |
145 | status|= MYSQL_WAIT_READ; |
146 | if (event & EV_WRITE) |
147 | status|= MYSQL_WAIT_WRITE; |
148 | if (event & EV_TIMEOUT) |
149 | status|= MYSQL_WAIT_TIMEOUT; |
150 | return status; |
151 | } |
152 | |
153 | |
154 | static int num_active_connections; |
155 | |
156 | /* Shortcut for going to new state immediately without waiting. */ |
157 | #define NEXT_IMMEDIATE(sd_, new_st) do { sd_->ST= new_st; goto again; } while (0) |
158 | |
159 | static void |
160 | state_machine_handler(int fd __attribute__((unused)), short event, void *arg) |
161 | { |
162 | struct state_data *sd= arg; |
163 | int status; |
164 | |
165 | again: |
166 | switch(sd->ST) |
167 | { |
168 | case 0: |
169 | /* Initial state, start making the connection. */ |
170 | status= mysql_real_connect_start(&sd->ret, &sd->mysql, opt_host, opt_user, opt_password, opt_db, opt_port, opt_socket, 0); |
171 | if (status) |
172 | /* Wait for connect to complete. */ |
173 | next_event(1, status, sd); |
174 | else |
175 | NEXT_IMMEDIATE(sd, 9); |
176 | break; |
177 | |
178 | case 1: |
179 | status= mysql_real_connect_cont(&sd->ret, &sd->mysql, mysql_status(event)); |
180 | if (status) |
181 | next_event(1, status, sd); |
182 | else |
183 | NEXT_IMMEDIATE(sd, 9); |
184 | break; |
185 | |
186 | case 9: |
187 | if (!sd->ret) |
188 | fatal(sd, "Failed to mysql_real_connect()" ); |
189 | NEXT_IMMEDIATE(sd, 10); |
190 | break; |
191 | |
192 | case 10: |
193 | /* Now run the next query. */ |
194 | sd->query_element= query_list; |
195 | if (!sd->query_element) |
196 | { |
197 | /* No more queries, end the connection. */ |
198 | NEXT_IMMEDIATE(sd, 40); |
199 | } |
200 | query_list= query_list->next; |
201 | |
202 | sd->index= sd->query_element->index; |
203 | printf("%d ! %s\n" , sd->index, sd->query_element->query); |
204 | status= mysql_real_query_start(&sd->err, &sd->mysql, sd->query_element->query, |
205 | strlen(sd->query_element->query)); |
206 | if (status) |
207 | next_event(11, status, sd); |
208 | else |
209 | NEXT_IMMEDIATE(sd, 20); |
210 | break; |
211 | |
212 | case 11: |
213 | status= mysql_real_query_cont(&sd->err, &sd->mysql, mysql_status(event)); |
214 | if (status) |
215 | next_event(11, status, sd); |
216 | else |
217 | NEXT_IMMEDIATE(sd, 20); |
218 | break; |
219 | |
220 | case 20: |
221 | my_free(sd->query_element->query); |
222 | my_free(sd->query_element); |
223 | if (sd->err) |
224 | { |
225 | printf("%d | Error: %s\n" , sd->index, mysql_error(&sd->mysql)); |
226 | NEXT_IMMEDIATE(sd, 10); |
227 | } |
228 | else |
229 | { |
230 | sd->result= mysql_use_result(&sd->mysql); |
231 | if (!sd->result) |
232 | fatal(sd, "mysql_use_result() returns error" ); |
233 | NEXT_IMMEDIATE(sd, 30); |
234 | } |
235 | break; |
236 | |
237 | case 30: |
238 | status= mysql_fetch_row_start(&sd->row, sd->result); |
239 | if (status) |
240 | next_event(31, status, sd); |
241 | else |
242 | NEXT_IMMEDIATE(sd, 39); |
243 | break; |
244 | |
245 | case 31: |
246 | status= mysql_fetch_row_cont(&sd->row, sd->result, mysql_status(event)); |
247 | if (status) |
248 | next_event(31, status, sd); |
249 | else |
250 | NEXT_IMMEDIATE(sd, 39); |
251 | break; |
252 | |
253 | case 39: |
254 | if (sd->row) |
255 | { |
256 | /* Got a row. */ |
257 | unsigned int i; |
258 | printf("%d - " , sd->index); |
259 | for (i= 0; i < mysql_num_fields(sd->result); i++) |
260 | printf("%s%s" , (i ? "\t" : "" ), (sd->row[i] ? sd->row[i] : "(null)" )); |
261 | printf ("\n" ); |
262 | NEXT_IMMEDIATE(sd, 30); |
263 | } |
264 | else |
265 | { |
266 | if (mysql_errno(&sd->mysql)) |
267 | { |
268 | /* An error occurred. */ |
269 | printf("%d | Error: %s\n" , sd->index, mysql_error(&sd->mysql)); |
270 | } |
271 | else |
272 | { |
273 | /* EOF. */ |
274 | printf("%d | EOF\n" , sd->index); |
275 | } |
276 | mysql_free_result(sd->result); |
277 | NEXT_IMMEDIATE(sd, 10); |
278 | } |
279 | break; |
280 | |
281 | case 40: |
282 | status= mysql_close_start(&sd->mysql); |
283 | if (status) |
284 | next_event(41, status, sd); |
285 | else |
286 | NEXT_IMMEDIATE(sd, 50); |
287 | break; |
288 | |
289 | case 41: |
290 | status= mysql_close_cont(&sd->mysql, mysql_status(event)); |
291 | if (status) |
292 | next_event(41, status, sd); |
293 | else |
294 | NEXT_IMMEDIATE(sd, 50); |
295 | break; |
296 | |
297 | case 50: |
298 | /* We are done! */ |
299 | num_active_connections--; |
300 | if (num_active_connections == 0) |
301 | event_loopbreak(); |
302 | break; |
303 | |
304 | default: |
305 | abort(); |
306 | } |
307 | } |
308 | |
309 | |
310 | void |
311 | add_query(const char *q) |
312 | { |
313 | struct query_entry *e; |
314 | char *q2; |
315 | size_t len; |
316 | |
317 | e= my_malloc(sizeof(*e), MYF(0)); |
318 | q2= my_strdup(q, MYF(0)); |
319 | if (!e || !q2) |
320 | fatal(NULL, "Out of memory" ); |
321 | |
322 | /* Remove any trailing newline. */ |
323 | len= strlen(q2); |
324 | if (q2[len] == '\n') |
325 | q2[len--]= '\0'; |
326 | if (q2[len] == '\r') |
327 | q2[len--]= '\0'; |
328 | |
329 | e->next= NULL; |
330 | e->query= q2; |
331 | e->index= query_counter++; |
332 | *tail_ptr= e; |
333 | tail_ptr= &e->next; |
334 | } |
335 | |
336 | |
337 | static my_bool |
338 | handle_option(int optid, const struct my_option *opt __attribute__((unused)), |
339 | char *arg) |
340 | { |
341 | switch (optid) |
342 | { |
343 | case '?': |
344 | printf("Usage: async_queries [OPTIONS] query ...\n" ); |
345 | my_print_help(options); |
346 | my_print_variables(options); |
347 | exit(0); |
348 | break; |
349 | |
350 | case 'p': |
351 | if (arg) |
352 | opt_password= arg; |
353 | else |
354 | tty_password= 1; |
355 | break; |
356 | } |
357 | |
358 | return 0; |
359 | } |
360 | |
361 | |
362 | int |
363 | main(int argc, char *argv[]) |
364 | { |
365 | struct state_data *sds; |
366 | unsigned int i; |
367 | int err; |
368 | struct event_base *libevent_base; |
369 | |
370 | err= handle_options(&argc, &argv, options, handle_option); |
371 | if (err) |
372 | exit(err); |
373 | if (tty_password) |
374 | opt_password= get_tty_password(NullS); |
375 | |
376 | if (opt_query_file) |
377 | { |
378 | FILE *f= fopen(opt_query_file, "r" ); |
379 | char buf[65536]; |
380 | if (!f) |
381 | fatal(NULL, "Cannot open query file" ); |
382 | while (!feof(f)) |
383 | { |
384 | if (!fgets(buf, sizeof(buf), f)) |
385 | break; |
386 | add_query(buf); |
387 | } |
388 | fclose(f); |
389 | } |
390 | /* Add extra queries directly on command line. */ |
391 | while (argc > 0) |
392 | { |
393 | --argc; |
394 | add_query(*argv++); |
395 | } |
396 | |
397 | sds= my_malloc(opt_connections * sizeof(*sds), MYF(0)); |
398 | if (!sds) |
399 | fatal(NULL, "Out of memory" ); |
400 | |
401 | libevent_base= event_init(); |
402 | |
403 | err= mysql_library_init(argc, argv, (char **)my_groups); |
404 | if (err) |
405 | { |
406 | fprintf(stderr, "Fatal: mysql_library_init() returns error: %d\n" , err); |
407 | exit(1); |
408 | } |
409 | |
410 | num_active_connections= 0; |
411 | for (i= 0; i < opt_connections; i++) |
412 | { |
413 | mysql_init(&sds[i].mysql); |
414 | mysql_options(&sds[i].mysql, MYSQL_OPT_NONBLOCK, 0); |
415 | mysql_options(&sds[i].mysql, MYSQL_READ_DEFAULT_GROUP, "async_queries" ); |
416 | |
417 | /* |
418 | We put the initial connect call in the first state 0 of the state machine |
419 | and run that manually, just to have everything in one place. |
420 | */ |
421 | sds[i].ST= 0; |
422 | num_active_connections++; |
423 | state_machine_handler(-1, -1, &sds[i]); |
424 | } |
425 | |
426 | event_dispatch(); |
427 | |
428 | my_free(sds); |
429 | |
430 | mysql_library_end(); |
431 | |
432 | event_base_free(libevent_base); |
433 | |
434 | return 0; |
435 | } |
436 | |