| 1 | /* |
| 2 | Copyright 2011 Kristian Nielsen and Monty Program Ab. |
| 3 | |
| 4 | This file is free software; you can redistribute it and/or |
| 5 | modify it under the terms of the GNU Lesser General Public |
| 6 | License as published by the Free Software Foundation; either |
| 7 | version 2.1 of the License, or (at your option) any later version. |
| 8 | |
| 9 | This library is distributed in the hope that it will be useful, |
| 10 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 12 | Lesser General Public License for more details. |
| 13 | |
| 14 | You should have received a copy of the GNU General Public License |
| 15 | along with this. If not, see <http://www.gnu.org/licenses/>. |
| 16 | */ |
| 17 | |
| 18 | |
| 19 | /* |
| 20 | Run a set of queries in parallel against a server using the non-blocking |
| 21 | API, and compare to running same queries with the normal blocking API. |
| 22 | */ |
| 23 | |
| 24 | #include <my_global.h> |
| 25 | #include <my_sys.h> |
| 26 | #include <mysql.h> |
| 27 | #include <my_getopt.h> |
| 28 | |
| 29 | #include <sys/time.h> |
| 30 | #include <stdlib.h> |
| 31 | #include <stdio.h> |
| 32 | #include <string.h> |
| 33 | |
| 34 | #include <event.h> |
| 35 | |
| 36 | |
| 37 | #define SL(s) (s), sizeof(s) |
| 38 | static const char *my_groups[]= { "client" , NULL }; |
| 39 | |
| 40 | /* Maintaining a list of queries to run. */ |
| 41 | struct query_entry { |
| 42 | struct query_entry *next; |
| 43 | char *query; |
| 44 | int index; |
| 45 | }; |
| 46 | static struct query_entry *query_list; |
| 47 | static struct query_entry **tail_ptr= &query_list; |
| 48 | static int query_counter= 0; |
| 49 | |
| 50 | |
| 51 | /* State kept for each connection. */ |
| 52 | struct state_data { |
| 53 | int ST; /* State machine current state */ |
| 54 | struct event ev_mysql; |
| 55 | MYSQL mysql; |
| 56 | MYSQL_RES *result; |
| 57 | MYSQL *ret; |
| 58 | int err; |
| 59 | MYSQL_ROW row; |
| 60 | struct query_entry *query_element; |
| 61 | int index; |
| 62 | }; |
| 63 | |
| 64 | |
| 65 | static const char *opt_db= NULL; |
| 66 | static const char *opt_user= NULL; |
| 67 | static const char *opt_password= NULL; |
| 68 | static int tty_password= 0; |
| 69 | static const char *opt_host= NULL; |
| 70 | static const char *opt_socket= NULL; |
| 71 | static unsigned int opt_port= 0; |
| 72 | static unsigned int opt_connections= 5; |
| 73 | static const char *opt_query_file= NULL; |
| 74 | |
| 75 | static struct my_option options[] = |
| 76 | { |
| 77 | {"database" , 'D', "Database to use" , &opt_db, &opt_db, |
| 78 | 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
| 79 | {"help" , '?', "Display this help and exit" , 0, 0, 0, GET_NO_ARG, NO_ARG, 0, |
| 80 | 0, 0, 0, 0, 0}, |
| 81 | {"host" , 'h', "Connect to host" , &opt_host, &opt_host, |
| 82 | 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
| 83 | {"password" , 'p', |
| 84 | "Password to use when connecting to server. If password is not given it's asked from the tty." , |
| 85 | 0, 0, 0, GET_STR, OPT_ARG, 0, 0, 0, 0, 0, 0}, |
| 86 | {"port" , 'P', "Port number to use for connection." , |
| 87 | &opt_port, &opt_port, 0, GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
| 88 | {"socket" , 'S', "Socket file to use for connection" , |
| 89 | &opt_socket, &opt_socket, 0, GET_STR, |
| 90 | REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
| 91 | {"user" , 'u', "User for login if not current user" , &opt_user, |
| 92 | &opt_user, 0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0}, |
| 93 | {"connections" , 'n', "Number of simultaneous connections/queries." , |
| 94 | &opt_connections, &opt_connections, 0, GET_UINT, REQUIRED_ARG, |
| 95 | 5, 0, 0, 0, 0, 0}, |
| 96 | {"queryfile" , 'q', "Name of file containing extra queries to run" , |
| 97 | &opt_query_file, &opt_query_file, 0, GET_STR, REQUIRED_ARG, |
| 98 | 0, 0, 0, 0, 0, 0}, |
| 99 | { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} |
| 100 | }; |
| 101 | |
| 102 | static void |
| 103 | fatal(struct state_data *sd, const char *msg) |
| 104 | { |
| 105 | fprintf(stderr, "%s: %s\n" , msg, (sd ? mysql_error(&sd->mysql) : "" )); |
| 106 | exit(1); |
| 107 | } |
| 108 | |
| 109 | |
| 110 | static void state_machine_handler(int fd, short event, void *arg); |
| 111 | |
| 112 | static void |
| 113 | next_event(int new_st, int status, struct state_data *sd) |
| 114 | { |
| 115 | short wait_event= 0; |
| 116 | struct timeval tv, *ptv; |
| 117 | int fd; |
| 118 | |
| 119 | if (status & MYSQL_WAIT_READ) |
| 120 | wait_event|= EV_READ; |
| 121 | if (status & MYSQL_WAIT_WRITE) |
| 122 | wait_event|= EV_WRITE; |
| 123 | if (wait_event) |
| 124 | fd= mysql_get_socket(&sd->mysql); |
| 125 | else |
| 126 | fd= -1; |
| 127 | if (status & MYSQL_WAIT_TIMEOUT) |
| 128 | { |
| 129 | tv.tv_sec= mysql_get_timeout_value(&sd->mysql); |
| 130 | tv.tv_usec= 0; |
| 131 | ptv= &tv; |
| 132 | } |
| 133 | else |
| 134 | ptv= NULL; |
| 135 | event_set(&sd->ev_mysql, fd, wait_event, state_machine_handler, sd); |
| 136 | event_add(&sd->ev_mysql, ptv); |
| 137 | sd->ST= new_st; |
| 138 | } |
| 139 | |
| 140 | static int |
| 141 | mysql_status(short event) |
| 142 | { |
| 143 | int status= 0; |
| 144 | if (event & EV_READ) |
| 145 | status|= MYSQL_WAIT_READ; |
| 146 | if (event & EV_WRITE) |
| 147 | status|= MYSQL_WAIT_WRITE; |
| 148 | if (event & EV_TIMEOUT) |
| 149 | status|= MYSQL_WAIT_TIMEOUT; |
| 150 | return status; |
| 151 | } |
| 152 | |
| 153 | |
| 154 | static int num_active_connections; |
| 155 | |
| 156 | /* Shortcut for going to new state immediately without waiting. */ |
| 157 | #define NEXT_IMMEDIATE(sd_, new_st) do { sd_->ST= new_st; goto again; } while (0) |
| 158 | |
| 159 | static void |
| 160 | state_machine_handler(int fd __attribute__((unused)), short event, void *arg) |
| 161 | { |
| 162 | struct state_data *sd= arg; |
| 163 | int status; |
| 164 | |
| 165 | again: |
| 166 | switch(sd->ST) |
| 167 | { |
| 168 | case 0: |
| 169 | /* Initial state, start making the connection. */ |
| 170 | status= mysql_real_connect_start(&sd->ret, &sd->mysql, opt_host, opt_user, opt_password, opt_db, opt_port, opt_socket, 0); |
| 171 | if (status) |
| 172 | /* Wait for connect to complete. */ |
| 173 | next_event(1, status, sd); |
| 174 | else |
| 175 | NEXT_IMMEDIATE(sd, 9); |
| 176 | break; |
| 177 | |
| 178 | case 1: |
| 179 | status= mysql_real_connect_cont(&sd->ret, &sd->mysql, mysql_status(event)); |
| 180 | if (status) |
| 181 | next_event(1, status, sd); |
| 182 | else |
| 183 | NEXT_IMMEDIATE(sd, 9); |
| 184 | break; |
| 185 | |
| 186 | case 9: |
| 187 | if (!sd->ret) |
| 188 | fatal(sd, "Failed to mysql_real_connect()" ); |
| 189 | NEXT_IMMEDIATE(sd, 10); |
| 190 | break; |
| 191 | |
| 192 | case 10: |
| 193 | /* Now run the next query. */ |
| 194 | sd->query_element= query_list; |
| 195 | if (!sd->query_element) |
| 196 | { |
| 197 | /* No more queries, end the connection. */ |
| 198 | NEXT_IMMEDIATE(sd, 40); |
| 199 | } |
| 200 | query_list= query_list->next; |
| 201 | |
| 202 | sd->index= sd->query_element->index; |
| 203 | printf("%d ! %s\n" , sd->index, sd->query_element->query); |
| 204 | status= mysql_real_query_start(&sd->err, &sd->mysql, sd->query_element->query, |
| 205 | strlen(sd->query_element->query)); |
| 206 | if (status) |
| 207 | next_event(11, status, sd); |
| 208 | else |
| 209 | NEXT_IMMEDIATE(sd, 20); |
| 210 | break; |
| 211 | |
| 212 | case 11: |
| 213 | status= mysql_real_query_cont(&sd->err, &sd->mysql, mysql_status(event)); |
| 214 | if (status) |
| 215 | next_event(11, status, sd); |
| 216 | else |
| 217 | NEXT_IMMEDIATE(sd, 20); |
| 218 | break; |
| 219 | |
| 220 | case 20: |
| 221 | my_free(sd->query_element->query); |
| 222 | my_free(sd->query_element); |
| 223 | if (sd->err) |
| 224 | { |
| 225 | printf("%d | Error: %s\n" , sd->index, mysql_error(&sd->mysql)); |
| 226 | NEXT_IMMEDIATE(sd, 10); |
| 227 | } |
| 228 | else |
| 229 | { |
| 230 | sd->result= mysql_use_result(&sd->mysql); |
| 231 | if (!sd->result) |
| 232 | fatal(sd, "mysql_use_result() returns error" ); |
| 233 | NEXT_IMMEDIATE(sd, 30); |
| 234 | } |
| 235 | break; |
| 236 | |
| 237 | case 30: |
| 238 | status= mysql_fetch_row_start(&sd->row, sd->result); |
| 239 | if (status) |
| 240 | next_event(31, status, sd); |
| 241 | else |
| 242 | NEXT_IMMEDIATE(sd, 39); |
| 243 | break; |
| 244 | |
| 245 | case 31: |
| 246 | status= mysql_fetch_row_cont(&sd->row, sd->result, mysql_status(event)); |
| 247 | if (status) |
| 248 | next_event(31, status, sd); |
| 249 | else |
| 250 | NEXT_IMMEDIATE(sd, 39); |
| 251 | break; |
| 252 | |
| 253 | case 39: |
| 254 | if (sd->row) |
| 255 | { |
| 256 | /* Got a row. */ |
| 257 | unsigned int i; |
| 258 | printf("%d - " , sd->index); |
| 259 | for (i= 0; i < mysql_num_fields(sd->result); i++) |
| 260 | printf("%s%s" , (i ? "\t" : "" ), (sd->row[i] ? sd->row[i] : "(null)" )); |
| 261 | printf ("\n" ); |
| 262 | NEXT_IMMEDIATE(sd, 30); |
| 263 | } |
| 264 | else |
| 265 | { |
| 266 | if (mysql_errno(&sd->mysql)) |
| 267 | { |
| 268 | /* An error occurred. */ |
| 269 | printf("%d | Error: %s\n" , sd->index, mysql_error(&sd->mysql)); |
| 270 | } |
| 271 | else |
| 272 | { |
| 273 | /* EOF. */ |
| 274 | printf("%d | EOF\n" , sd->index); |
| 275 | } |
| 276 | mysql_free_result(sd->result); |
| 277 | NEXT_IMMEDIATE(sd, 10); |
| 278 | } |
| 279 | break; |
| 280 | |
| 281 | case 40: |
| 282 | status= mysql_close_start(&sd->mysql); |
| 283 | if (status) |
| 284 | next_event(41, status, sd); |
| 285 | else |
| 286 | NEXT_IMMEDIATE(sd, 50); |
| 287 | break; |
| 288 | |
| 289 | case 41: |
| 290 | status= mysql_close_cont(&sd->mysql, mysql_status(event)); |
| 291 | if (status) |
| 292 | next_event(41, status, sd); |
| 293 | else |
| 294 | NEXT_IMMEDIATE(sd, 50); |
| 295 | break; |
| 296 | |
| 297 | case 50: |
| 298 | /* We are done! */ |
| 299 | num_active_connections--; |
| 300 | if (num_active_connections == 0) |
| 301 | event_loopbreak(); |
| 302 | break; |
| 303 | |
| 304 | default: |
| 305 | abort(); |
| 306 | } |
| 307 | } |
| 308 | |
| 309 | |
| 310 | void |
| 311 | add_query(const char *q) |
| 312 | { |
| 313 | struct query_entry *e; |
| 314 | char *q2; |
| 315 | size_t len; |
| 316 | |
| 317 | e= my_malloc(sizeof(*e), MYF(0)); |
| 318 | q2= my_strdup(q, MYF(0)); |
| 319 | if (!e || !q2) |
| 320 | fatal(NULL, "Out of memory" ); |
| 321 | |
| 322 | /* Remove any trailing newline. */ |
| 323 | len= strlen(q2); |
| 324 | if (q2[len] == '\n') |
| 325 | q2[len--]= '\0'; |
| 326 | if (q2[len] == '\r') |
| 327 | q2[len--]= '\0'; |
| 328 | |
| 329 | e->next= NULL; |
| 330 | e->query= q2; |
| 331 | e->index= query_counter++; |
| 332 | *tail_ptr= e; |
| 333 | tail_ptr= &e->next; |
| 334 | } |
| 335 | |
| 336 | |
| 337 | static my_bool |
| 338 | handle_option(int optid, const struct my_option *opt __attribute__((unused)), |
| 339 | char *arg) |
| 340 | { |
| 341 | switch (optid) |
| 342 | { |
| 343 | case '?': |
| 344 | printf("Usage: async_queries [OPTIONS] query ...\n" ); |
| 345 | my_print_help(options); |
| 346 | my_print_variables(options); |
| 347 | exit(0); |
| 348 | break; |
| 349 | |
| 350 | case 'p': |
| 351 | if (arg) |
| 352 | opt_password= arg; |
| 353 | else |
| 354 | tty_password= 1; |
| 355 | break; |
| 356 | } |
| 357 | |
| 358 | return 0; |
| 359 | } |
| 360 | |
| 361 | |
| 362 | int |
| 363 | main(int argc, char *argv[]) |
| 364 | { |
| 365 | struct state_data *sds; |
| 366 | unsigned int i; |
| 367 | int err; |
| 368 | struct event_base *libevent_base; |
| 369 | |
| 370 | err= handle_options(&argc, &argv, options, handle_option); |
| 371 | if (err) |
| 372 | exit(err); |
| 373 | if (tty_password) |
| 374 | opt_password= get_tty_password(NullS); |
| 375 | |
| 376 | if (opt_query_file) |
| 377 | { |
| 378 | FILE *f= fopen(opt_query_file, "r" ); |
| 379 | char buf[65536]; |
| 380 | if (!f) |
| 381 | fatal(NULL, "Cannot open query file" ); |
| 382 | while (!feof(f)) |
| 383 | { |
| 384 | if (!fgets(buf, sizeof(buf), f)) |
| 385 | break; |
| 386 | add_query(buf); |
| 387 | } |
| 388 | fclose(f); |
| 389 | } |
| 390 | /* Add extra queries directly on command line. */ |
| 391 | while (argc > 0) |
| 392 | { |
| 393 | --argc; |
| 394 | add_query(*argv++); |
| 395 | } |
| 396 | |
| 397 | sds= my_malloc(opt_connections * sizeof(*sds), MYF(0)); |
| 398 | if (!sds) |
| 399 | fatal(NULL, "Out of memory" ); |
| 400 | |
| 401 | libevent_base= event_init(); |
| 402 | |
| 403 | err= mysql_library_init(argc, argv, (char **)my_groups); |
| 404 | if (err) |
| 405 | { |
| 406 | fprintf(stderr, "Fatal: mysql_library_init() returns error: %d\n" , err); |
| 407 | exit(1); |
| 408 | } |
| 409 | |
| 410 | num_active_connections= 0; |
| 411 | for (i= 0; i < opt_connections; i++) |
| 412 | { |
| 413 | mysql_init(&sds[i].mysql); |
| 414 | mysql_options(&sds[i].mysql, MYSQL_OPT_NONBLOCK, 0); |
| 415 | mysql_options(&sds[i].mysql, MYSQL_READ_DEFAULT_GROUP, "async_queries" ); |
| 416 | |
| 417 | /* |
| 418 | We put the initial connect call in the first state 0 of the state machine |
| 419 | and run that manually, just to have everything in one place. |
| 420 | */ |
| 421 | sds[i].ST= 0; |
| 422 | num_active_connections++; |
| 423 | state_machine_handler(-1, -1, &sds[i]); |
| 424 | } |
| 425 | |
| 426 | event_dispatch(); |
| 427 | |
| 428 | my_free(sds); |
| 429 | |
| 430 | mysql_library_end(); |
| 431 | |
| 432 | event_base_free(libevent_base); |
| 433 | |
| 434 | return 0; |
| 435 | } |
| 436 | |