1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/* author: M Kersten
10 * Progress indicator
11 * tachograph -d demo
12 * which connects to the demo database server and presents a server progress bar.
13*/
14
15#include "monetdb_config.h"
16#include "stream.h"
17#include "stream_socket.h"
18#include "mapi.h"
19#include <string.h>
20#include <sys/stat.h>
21#include <signal.h>
22#ifdef HAVE_UNISTD_H
23# include <unistd.h>
24#endif
25#include <math.h>
26#include "mprompt.h"
27#include "dotmonetdb.h"
28#include "eventparser.h"
29
30#ifndef HAVE_GETOPT_LONG
31# include "monet_getopt.h"
32#else
33# ifdef HAVE_GETOPT_H
34# include "getopt.h"
35# endif
36#endif
37
38#ifdef HAVE_NETDB_H
39# include <netdb.h>
40# include <netinet/in.h>
41#endif
42
43#ifndef INVALID_SOCKET
44#define INVALID_SOCKET (-1)
45#endif
46
47
48#define die(dbh, hdl) \
49 do { \
50 if (hdl) \
51 mapi_explain_query(hdl, stderr); \
52 else if (dbh) \
53 mapi_explain(dbh, stderr); \
54 else \
55 fprintf(stderr, "!! command failed\n"); \
56 goto stop_disconnect; \
57 } while (0)
58
59#define doQ(X) \
60 do { \
61 if ((hdl = mapi_query(dbh, X)) == NULL || \
62 mapi_error(dbh) != MOK) \
63 die(dbh, hdl); \
64 } while (0)
65
66static stream *conn = NULL;
67static char hostname[128];
68static char *dbname;
69static Mapi dbh;
70static MapiHdl hdl = NULL;
71static int capturing=0;
72static int lastpc;
73static int showonbar;
74static int pccount;
75
76#define RUNNING 1
77#define FINISHED 2
78typedef struct{
79 int state;
80 int64_t etc;
81 int64_t actual;
82 int64_t clkticks;
83 char *stmt;
84} Event;
85
86Event *events;
87static int maxevents;
88
89typedef struct {
90 char *varname;
91 char *source;
92} Source;
93Source *sources; // original column name
94int srctop, srcmax;
95
96/*
97 * Parsing the argument list of a MAL call to obtain un-quoted string values
98 */
99
100static void
101usageTachograph(void)
102{
103 fprintf(stderr, "tachograph [options] \n");
104 fprintf(stderr, " -d | --dbname=<database_name>\n");
105 fprintf(stderr, " -u | --user=<user>\n");
106 fprintf(stderr, " -p | --port=<portnr>\n");
107 fprintf(stderr, " -h | --host=<hostname>\n");
108 fprintf(stderr, " -? | --help\n");
109 exit(-1);
110}
111
112/* Any signal should be captured and turned into a graceful
113 * termination of the profiling session. */
114
115static void
116stopListening(int i)
117{
118 fprintf(stderr,"signal %d received\n",i);
119 if( dbh)
120 doQ("profiler.stop();");
121stop_disconnect:
122 // show follow up action only once
123 if(dbh)
124 mapi_disconnect(dbh);
125 exit(0);
126}
127
128char *currentfunction= 0;
129int currenttag; // to distinguish query invocations
130int64_t starttime = 0;
131int64_t finishtime = 0;
132int64_t duration =0;
133char *prevquery= 0;
134int prevprogress =0;// pc of previous progress display
135int prevlevel =0;
136size_t txtlength=0;
137
138// limit the number of separate queries in the pool
139#define QUERYPOOL 32
140static int querypool = QUERYPOOL;
141int queryid= 0;
142
143static void resetTachograph(void){
144 int i;
145 if (debug)
146 fprintf(stderr, "RESET tachograph\n");
147 if( prevprogress)
148 printf("\n");
149 for(i=0; i < maxevents; i++)
150 if( events[i].stmt)
151 free(events[i].stmt);
152 for(i=0; i< srctop; i++){
153 free(sources[i].varname);
154 free(sources[i].source);
155 }
156 capturing = 0;
157 srctop=0;
158 currentfunction = 0;
159 currenttag = 0;
160 starttime = 0;
161 finishtime = 0;
162 duration =0;
163 prevprogress = 0;
164 txtlength =0;
165 prevlevel=0;
166 lastpc = 0;
167 showonbar = -1;
168 pccount = 0;
169 fflush(stdout);
170 events = 0;
171 queryid = (queryid+1) % querypool;
172}
173
174static char stamp[BUFSIZ]={0};
175static void
176rendertime(int64_t ticks, int flg)
177{
178 int t, hr,min,sec;
179
180 if( ticks == 0){
181 strcpy(stamp,"unknown ");
182 return;
183 }
184 t = (int) (ticks/1000000);
185 sec = t % 60;
186 min = (t /60) %60;
187 hr = (t /3600);
188 if( flg)
189 snprintf(stamp,BUFSIZ,"%02d:%02d:%02d.%06d", hr,min,sec, (int) ticks %1000000);
190 else
191 snprintf(stamp,BUFSIZ,"%02d:%02d:%02d", hr,min,sec);
192}
193
194#define MSGLEN 100
195
196
197static void
198showBar(int level, int64_t clk, char *stmt)
199{
200 int64_t i =0, nl;
201 size_t stamplen=0;
202
203 nl = level/2-prevlevel/2;
204 if( level != 100 && (nl == 0 || level/2 <= prevlevel/2))
205 return;
206 static_assert(MSGLEN < BUFSIZ, "MSGLEN too small");
207 if(prevlevel == 0)
208 printf("[");
209 else
210 for( i= 50 - prevlevel/2 +txtlength; i>0; i--)
211 printf("\b \b");
212 for( i=0 ; i< nl ; i++)
213 putchar('#');
214 for( ; i < 50-prevlevel/2; i++)
215 putchar('.');
216 putchar(level ==100?']':'>');
217 printf(" %3d%%",level);
218 if( level == 100 || duration == 0){
219 rendertime(clk,1);
220 printf(" %s ",stamp);
221 stamplen= strlen(stamp)+3;
222 } else
223 if( duration && duration- clk > 0){
224 rendertime(duration - clk,0);
225 printf(" %s ETC ", stamp);
226 stamplen= strlen(stamp)+3;
227 } else
228 if( duration && duration- clk < 0){
229 rendertime(clk - duration ,0);
230 printf(" +%s ETC ",stamp);
231 stamplen= strlen(stamp)+3;
232 }
233 if( stmt)
234 printf("%s",stmt);
235 fflush(stdout);
236 txtlength = 11 + stamplen + strlen(stmt?stmt:"");
237 prevlevel = level;
238}
239
240static void
241initFiles(void)
242{
243}
244
245static void
246update(EventRecord *ev)
247{
248 int progress=0;
249 int i;
250 int uid = 0,qid = 0;
251
252 /* handle a ping event, keep the current instruction in focus */
253 if (ev->state >= MDB_PING ) {
254 // All state events are ignored
255 return;
256 }
257
258 if (debug)
259 fprintf(stderr, "Update %s input %s stmt %s time %" PRId64"\n",(ev->state>=0?statenames[ev->state]:"unknown"),(ev->fcn?ev->fcn:"(null)"),(currentfunction?currentfunction:""),ev->clkticks -starttime);
260
261 if (starttime == 0) {
262 if (ev->fcn == 0 ) {
263 if (debug)
264 fprintf(stderr, "Skip %s input\n",(ev->state>=0?statenames[ev->state]:"unknown"));
265 return;
266 }
267 if (debug)
268 fprintf(stderr, "Start capturing updates %s\n",ev->fcn);
269 }
270 if (ev->clkticks < 0) {
271 /* HACK: *TRY TO* compensate for the fact that the MAL
272 * profiler chops-off day information, and assume that
273 * clkticks is < starttime because the tomograph run
274 * crossed a day boundary (midnight);
275 * we simply add 1 day (24 hours) worth of microseconds.
276 * NOTE: this surely does NOT work correctly if the
277 * tomograph run takes 24 hours or more ...
278 */
279 ev->clkticks += US_DD;
280 }
281
282 /* monitor top level function brackets, we restrict ourselves to SQL queries */
283 if (ev->state == MDB_START && ev->fcn && strncmp(ev->fcn, "function", 8) == 0) {
284 if( capturing){
285 //fprintf(stderr,"Input garbled or we lost some events\n");
286 resetTachograph();
287 capturing = 0;
288 }
289 if( (i = sscanf(ev->fcn + 9,"user.s%d_%d",&uid,&qid)) != 2){
290 if( debug)
291 fprintf(stderr,"Skip parsing %d, uid %d qid %d\n",i,uid,qid);
292 return;
293 }
294 if (capturing++ == 0){
295 starttime = ev->clkticks;
296 finishtime = ev->clkticks + ev->ticks;
297 duration = ev->ticks;
298 }
299 if (currentfunction == 0){
300 currentfunction = strdup(ev->fcn+9);
301 currenttag = ev->tag;
302 }
303 if (debug)
304 fprintf(stderr, "Enter function %s capture %d\n", currentfunction, capturing);
305 initFiles();
306 return;
307 }
308 ev->clkticks -= starttime;
309
310 if ( !capturing)
311 return;
312
313 if( ev->pc > lastpc)
314 lastpc = ev->pc;
315
316 /* start of instruction box */
317 if (ev->state == MDB_START ) {
318 if(ev->fcn && strstr(ev->fcn,"querylog.define") ){
319 // extract a string argument from a known MAL signature
320 maxevents = malsize;
321 events = calloc(maxevents, sizeof(Event));
322 // use the truncated query text, beware that the \ is already escaped in the call argument.
323 if(currentquery) {
324 if( prevquery && strcmp(currentquery,prevquery)){
325 printf("%s\n",currentquery);
326 free(prevquery);
327 prevquery = strdup(currentquery);
328 } else
329 if( prevquery == 0){
330 printf("%s\n",currentquery);
331 prevquery = strdup(currentquery);
332 }
333 }
334 }
335 if( ev->tag != currenttag)
336 return; // forget all except one query
337 assert(ev->pc < maxevents);
338 events[ev->pc].state = RUNNING;
339 events[ev->pc].stmt = strdup(ev->beauty? ev->beauty:"");
340 events[ev->pc].etc = ev->ticks;
341 events[ev->pc].clkticks = ev->clkticks;
342 showonbar = ev->pc;
343 return;
344 }
345 /* end the instruction box */
346 if (ev->state == MDB_DONE ){
347
348 events[ev->pc].state= FINISHED;
349 if( ev->tag != currenttag)
350 return; // forget all except one query
351
352 progress = (int)(pccount++ / (malsize/100.0));
353 for(i = lastpc; i > 0; i--)
354 if( events[i].state == RUNNING )
355 break;
356
357 if( showonbar == ev->pc)
358 showonbar = i < 0 ?-1 : i;
359 showBar((progress>100.0?(int)100:progress), ev->clkticks, (showonbar >= 0 ? events[showonbar].stmt:NULL));
360 events[ev->pc].actual= ev->ticks;
361 }
362 if (ev->state == MDB_DONE && ev->fcn && strncmp(ev->fcn, "function", 8) == 0) {
363 if (currentfunction && strcmp(currentfunction, ev->fcn+9) == 0) {
364 if( capturing == 0){
365 free(currentfunction);
366 currentfunction = 0;
367 }
368
369 showBar(100,ev->clkticks, "\n");
370 if(debug)
371 fprintf(stderr, "Leave function %s capture %d\n", currentfunction, capturing);
372 resetTachograph();
373 initFiles();
374 }
375 }
376}
377
378int
379main(int argc, char **argv)
380{
381 ssize_t n;
382 size_t len, buflen;
383 char *host = NULL;
384 int portnr = 0;
385 char *uri = NULL;
386 char *user = NULL;
387 char *password = NULL;
388 char buf[BUFSIZ], *buffer, *e, *response;
389 int done = 0;
390 EventRecord event;
391
392 static struct option long_options[15] = {
393 { "dbname", 1, 0, 'd' },
394 { "user", 1, 0, 'u' },
395 { "port", 1, 0, 'p' },
396 { "password", 1, 0, 'P' },
397 { "host", 1, 0, 'h' },
398 { "help", 0, 0, '?' },
399 { "output", 1, 0, 'o' },
400 { "queries", 1, 0, 'q' },
401 { "debug", 0, 0, 'D' },
402 { 0, 0, 0, 0 }
403 };
404
405 /* parse config file first, command line options override */
406 parse_dotmonetdb(&user, &password, &dbname, NULL, NULL, NULL, NULL);
407
408 while (1) {
409 int option_index = 0;
410 int c = getopt_long(argc, argv, "d:u:p:P:h:?:o:q:D",
411 long_options, &option_index);
412 if (c == -1)
413 break;
414 switch (c) {
415 case 'D':
416 debug = 1;
417 break;
418 case 'd':
419 if (dbname)
420 free(dbname);
421 dbname = strdup(optarg);
422 break;
423 case 'u':
424 if (user)
425 free(user);
426 user = strdup(optarg);
427 /* force password prompt */
428 if (password)
429 free(password);
430 password = NULL;
431 break;
432 case 'P':
433 if (password)
434 free(password);
435 password = strdup(optarg);
436 break;
437 case 'p':
438 if (optarg)
439 portnr = atoi(optarg);
440 break;
441 case 'q':
442 if (optarg)
443 querypool = atoi(optarg) > 0? atoi(optarg):1;
444 break;
445 case 'h':
446 host = optarg;
447 break;
448 case '?':
449 usageTachograph();
450 /* a bit of a hack: look at the option that the
451 current `c' is based on and see if we recognize
452 it: if -? or --help, exit with 0, else with -1 */
453 exit(strcmp(argv[optind - 1], "-?") == 0 || strcmp(argv[optind - 1], "--help") == 0 ? 0 : -1);
454 default:
455 usageTachograph();
456 exit(-1);
457 }
458 }
459
460 if( dbname == NULL){
461 usageTachograph();
462 exit(-1);
463 }
464
465 if (dbname != NULL && strncmp(dbname, "mapi:monetdb://", 15) == 0) {
466 uri = dbname;
467 dbname = NULL;
468 }
469
470#ifdef SIGPIPE
471 signal(SIGPIPE, stopListening);
472#endif
473#ifdef SIGHUP
474 signal(SIGHUP, stopListening);
475#endif
476#ifdef SIGQUIT
477 signal(SIGQUIT, stopListening);
478#endif
479 signal(SIGINT, stopListening);
480 signal(SIGTERM, stopListening);
481 close(0);
482
483 if (user == NULL)
484 user = simple_prompt("user", BUFSIZ, 1, prompt_getlogin());
485 if (password == NULL)
486 password = simple_prompt("password", BUFSIZ, 0, NULL);
487
488 /* our hostname, how remote servers have to contact us */
489 gethostname(hostname, sizeof(hostname));
490
491 /* set up the profiler */
492 if (uri)
493 dbh = mapi_mapiuri(uri, user, password, "mal");
494 else
495 dbh = mapi_mapi(host, portnr, user, password, "mal", dbname);
496 if (dbh == NULL || mapi_error(dbh))
497 die(dbh, hdl);
498 mapi_reconnect(dbh);
499 if (mapi_error(dbh))
500 die(dbh, hdl);
501 host = strdup(mapi_get_host(dbh));
502 if(debug)
503 fprintf(stderr,"-- connection with server %s\n", uri ? uri : host);
504
505 snprintf(buf,BUFSIZ-1,"profiler.setheartbeat(0);");
506 if( debug)
507 fprintf(stderr,"-- %s\n",buf);
508 doQ(buf);
509
510 snprintf(buf, BUFSIZ, " profiler.openstream(0);");
511 if( debug)
512 fprintf(stderr,"-- %s\n",buf);
513 doQ(buf);
514
515 len = 0;
516 buflen = BUFSIZ;
517 buffer = (char *) malloc(buflen);
518 if( buffer == NULL){
519 fprintf(stderr,"Could not create input buffer\n");
520 exit(-1);
521 }
522 conn = mapi_get_from(dbh);
523 while ((n = mnstr_read(conn, buffer + len, 1, buflen - len-1)) >= 0) {
524 if (n == 0 &&
525 (n = mnstr_read(conn, buffer + len, 1, buflen - len-1)) <= 0)
526 break;
527 buffer[len + n] = 0;
528 response = buffer;
529 while ((e = strchr(response, '\n')) != NULL) {
530 *e = 0;
531 if(debug)
532 printf("%s\n", response);
533 done= keyvalueparser(response, &event);
534 if( done == 1){
535 update(&event);
536 } else if( done == 0){
537 if (debug )
538 fprintf(stderr, "PARSE %d:%s\n", done, response);
539 }
540 response = e + 1;
541 }
542 /* handle the case that the line is too long to
543 * fit in the buffer */
544 if( response == buffer){
545 char *new = (char *) realloc(buffer, buflen + BUFSIZ);
546 if( new == NULL){
547 fprintf(stderr,"Could not extend input buffer\n");
548 exit(-1);
549 }
550 buffer = new;
551 buflen += BUFSIZ;
552 len += n;
553 }
554 /* handle the case the buffer contains more than one
555 * line, and the last line is not completely read yet.
556 * Copy the first part of the incomplete line to the
557 * beginning of the buffer */
558 else if (*response) {
559 if (debug)
560 printf("LASTLINE:%s", response);
561 len = strlen(response);
562 snprintf(buffer, len + 1, "%s", response);
563 } else /* reset this line of buffer */
564 len = 0;
565 }
566
567 doQ("profiler.stop();");
568stop_disconnect:
569 if(dbh)
570 mapi_disconnect(dbh);
571 printf("-- connection with server %s closed\n", uri ? uri : host);
572 return 0;
573}
574