1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * Clients gain access to the Monet server through a internet connection.
11 * Access through the internet requires a client program at the source,
12 * which addresses the default port of a running server. It is a textual
13 * interface for expert use.
14 *
15 * At the server side, each client is represented by a session record
16 * with the current status, such as name, file descriptors, namespace,
17 * and local stack. Each client session has a dedicated thread of
18 * control.
19 *
20 * The number of clients permitted concurrent access is a run time
21 * option.
22 *
23 * Client sessions remain in existence until the corresponding
24 * communication channels break.
25 *
26 * A client record is initialized upon acceptance of a connection. The
27 * client runs in his own thread of control until it finds a
28 * soft-termination request mode (FINISHCLIENT) or its IO file descriptors
29 * are closed. The latter generates an IO error, which leads to a safe
30 * termination.
31 *
32 * The system administrator client runs in the primary thread of control
33 * to simplify debugging with external debuggers.
34 *
35 * Searching a free client record is encapsulated in a critical section
36 * to hand them out one-at-a-time. Marking them as being claimed avoids
37 * any interference from parallel actions to obtain client records.
38 */
39
40/* (author) M.L. Kersten */
41#include "monetdb_config.h"
42#include "mal_client.h"
43#include "mal_import.h"
44#include "mal_parser.h"
45#include "mal_namespace.h"
46#include "mal_private.h"
47#include "mal_runtime.h"
48#include "mal_authorize.h"
49
50int MAL_MAXCLIENTS = 0;
51ClientRec *mal_clients;
52
53void
54mal_client_reset(void)
55{
56 MAL_MAXCLIENTS = 0;
57 if (mal_clients)
58 GDKfree(mal_clients);
59}
60
61bool
62MCinit(void)
63{
64 const char *max_clients = GDKgetenv("max_clients");
65 int maxclients = 0;
66
67 if (max_clients != NULL)
68 maxclients = atoi(max_clients);
69 if (maxclients <= 0) {
70 maxclients = 64;
71 if (GDKsetenv("max_clients", "64") != GDK_SUCCEED) {
72 fprintf(stderr, "!MCinit: GDKsetenv failed");
73 return false;
74 }
75 }
76
77 MAL_MAXCLIENTS = /* client connections */ maxclients;
78 mal_clients = GDKzalloc(sizeof(ClientRec) * MAL_MAXCLIENTS);
79 if( mal_clients == NULL){
80 fprintf(stderr,"!MCinit:" MAL_MALLOC_FAIL);
81 return false;
82 }
83 for (int i = 0; i < MAL_MAXCLIENTS; i++)
84 ATOMIC_INIT(&mal_clients[i].lastprint, 0);
85 return true;
86}
87
88/* stack the files from which you read */
89int
90MCpushClientInput(Client c, bstream *new_input, int listing, char *prompt)
91{
92 ClientInput *x = (ClientInput *) GDKmalloc(sizeof(ClientInput));
93 if (x == 0)
94 return -1;
95 x->fdin = c->fdin;
96 x->yycur = c->yycur;
97 x->listing = c->listing;
98 x->prompt = c->prompt;
99 x->next = c->bak;
100 c->bak = x;
101 c->fdin = new_input;
102 c->listing = listing;
103 c->prompt = prompt ? GDKstrdup(prompt) : GDKstrdup("");
104 if(c->prompt == 0) {
105 GDKfree(x);
106 return -1;
107 }
108 c->promptlength = strlen(c->prompt);
109 c->yycur = 0;
110 return 0;
111}
112
113void
114MCpopClientInput(Client c)
115{
116 ClientInput *x = c->bak;
117 if (c->fdin) {
118 /* missing protection against closing stdin stream */
119 bstream_destroy(c->fdin);
120 }
121 GDKfree(c->prompt);
122 c->fdin = x->fdin;
123 c->yycur = x->yycur;
124 c->listing = x->listing;
125 c->prompt = x->prompt;
126 c->promptlength = strlen(c->prompt);
127 c->bak = x->next;
128 GDKfree(x);
129}
130
131static Client
132MCnewClient(void)
133{
134 Client c;
135 MT_lock_set(&mal_contextLock);
136 for (c = mal_clients; c < mal_clients + MAL_MAXCLIENTS; c++) {
137 if (c->mode == FREECLIENT) {
138 c->mode = RUNCLIENT;
139 break;
140 }
141 }
142 MT_lock_unset(&mal_contextLock);
143
144 if (c == mal_clients + MAL_MAXCLIENTS)
145 return NULL;
146 c->idx = (int) (c - mal_clients);
147#ifdef MAL_CLIENT_DEBUG
148 fprintf(stderr,"New client created %d\n", (int) (c - mal_clients));
149#endif
150 return c;
151}
152
153/*
154 * You can always retrieve a client record using the thread identifier,
155 * because we maintain a 1-1 mapping between client and thread of
156 * control. Therefore, we don't need locks either.
157 * If the number of clients becomes too large, we have to change the
158 * allocation and lookup scheme.
159 *
160 * Finding a client record is tricky when we are spawning threads as
161 * co-workers. It is currently passed as an argument.
162 */
163
164Client
165MCgetClient(int id)
166{
167 if (id < 0 || id >= MAL_MAXCLIENTS)
168 return NULL;
169 return mal_clients + id;
170}
171
172/*
173 * The resetProfiler is called when the owner of the event stream
174 * leaves the scene. (Unclear if parallelism may cause errors)
175 */
176
177static void
178MCresetProfiler(stream *fdout)
179{
180 if (fdout != maleventstream)
181 return;
182 MT_lock_set(&mal_profileLock);
183 maleventstream = 0;
184 MT_lock_unset(&mal_profileLock);
185}
186
187void
188MCexitClient(Client c)
189{
190#ifdef MAL_CLIENT_DEBUG
191 fprintf(stderr,"# Exit client %d\n", c->idx);
192#endif
193 finishSessionProfiler(c);
194 MCresetProfiler(c->fdout);
195 if (c->father == NULL) { /* normal client */
196 if (c->fdout && c->fdout != GDKstdout) {
197 close_stream(c->fdout);
198 }
199 assert(c->bak == NULL);
200 if (c->fdin) {
201 /* protection against closing stdin stream */
202 if (c->fdin->s == GDKstdin)
203 c->fdin->s = NULL;
204 bstream_destroy(c->fdin);
205 }
206 c->fdout = NULL;
207 c->fdin = NULL;
208 }
209}
210
211static Client
212MCinitClientRecord(Client c, oid user, bstream *fin, stream *fout)
213{
214 const char *prompt;
215
216 c->user = user;
217 c->username = 0;
218 c->scenario = NULL;
219 c->oldscenario = NULL;
220 c->srcFile = NULL;
221 c->blkmode = 0;
222
223 c->fdin = fin ? fin : bstream_create(GDKstdin, 0);
224 if ( c->fdin == NULL){
225 MT_lock_set(&mal_contextLock);
226 c->mode = FREECLIENT;
227 MT_lock_unset(&mal_contextLock);
228 fprintf(stderr,"!initClientRecord:" MAL_MALLOC_FAIL);
229 return NULL;
230 }
231 c->yycur = 0;
232 c->bak = NULL;
233
234 c->listing = 0;
235 c->fdout = fout ? fout : GDKstdout;
236 c->curprg = c->backup = 0;
237 c->glb = 0;
238
239 /* remove garbage from previous connection
240 * be aware, a user can introduce several modules
241 * that should be freed to avoid memory leaks */
242 c->usermodule = c->curmodule = 0;
243
244 c->father = NULL;
245 c->idle = c->login = c->lastcmd = time(0);
246 c->session = GDKusec();
247 strcpy_len(c->optimizer, "default_pipe", sizeof(c->optimizer));
248 c->workerlimit = 0;
249 c->memorylimit = 0;
250 c->querytimeout = 0;
251 c->sessiontimeout = 0;
252 c->itrace = 0;
253 c->errbuf = 0;
254
255 prompt = PROMPT1;
256 c->prompt = GDKstrdup(prompt);
257 if ( c->prompt == NULL){
258 if (fin == NULL) {
259 c->fdin->s = NULL;
260 bstream_destroy(c->fdin);
261 MT_lock_set(&mal_contextLock);
262 c->mode = FREECLIENT;
263 MT_lock_unset(&mal_contextLock);
264 }
265 fprintf(stderr, "!initClientRecord:" MAL_MALLOC_FAIL);
266 return NULL;
267 }
268 c->promptlength = strlen(prompt);
269
270 c->actions = 0;
271 c->profticks = c->profstmt = NULL;
272 c->error_row = c->error_fld = c->error_msg = c->error_input = NULL;
273 c->sqlprofiler = 0;
274 c->wlc_kind = 0;
275 c->wlc = NULL;
276#ifndef HAVE_EMBEDDED /* no authentication in embedded mode */
277 {
278 str msg = AUTHgetUsername(&c->username, c);
279 if (msg) /* shouldn't happen */
280 freeException(msg);
281 }
282#endif
283 c->blocksize = BLOCK;
284 c->protocol = PROTOCOL_9;
285
286 c->filetrans = false;
287 c->getquery = NULL;
288
289 char name[16];
290 snprintf(name, sizeof(name), "Client%d->s", (int) (c - mal_clients));
291 MT_sema_init(&c->s, 0, name);
292 return c;
293}
294
295Client
296MCinitClient(oid user, bstream *fin, stream *fout)
297{
298 Client c = NULL;
299
300 if ((c = MCnewClient()) == NULL)
301 return NULL;
302 return MCinitClientRecord(c, user, fin, fout);
303}
304
305
306/*
307 * The administrator should be initialized to enable interpretation of
308 * the command line arguments, before it starts servicing statements
309 */
310int
311MCinitClientThread(Client c)
312{
313 Thread t;
314
315 t = MT_thread_getdata(); /* should succeed */
316 if (t == NULL) {
317 MCresetProfiler(c->fdout);
318 return -1;
319 }
320 /*
321 * The GDK thread administration should be set to reflect use of
322 * the proper IO descriptors.
323 */
324 t->data[1] = c->fdin;
325 t->data[0] = c->fdout;
326 c->mythread = t;
327 c->errbuf = GDKerrbuf;
328 if (c->errbuf == NULL) {
329 char *n = GDKzalloc(GDKMAXERRLEN);
330 if ( n == NULL){
331 MCresetProfiler(c->fdout);
332 return -1;
333 }
334 GDKsetbuf(n);
335 c->errbuf = GDKerrbuf;
336 } else
337 c->errbuf[0] = 0;
338 return 0;
339}
340
341/*
342 * Forking is a relatively cheap way to create a new client. The new
343 * client record shares the IO descriptors. To avoid interference, we
344 * limit children to only produce output by closing the input-side.
345 *
346 * If the father itself is a temporary client, let the new child depend
347 * on the grandfather.
348 */
349Client
350MCforkClient(Client father)
351{
352 Client son = NULL;
353 str prompt;
354
355 if (father == NULL)
356 return NULL;
357 if (father->father != NULL)
358 father = father->father;
359 if((prompt = GDKstrdup(father->prompt)) == NULL)
360 return NULL;
361 if ((son = MCinitClient(father->user, father->fdin, father->fdout))) {
362 son->fdin = NULL;
363 son->fdout = father->fdout;
364 son->bak = NULL;
365 son->yycur = 0;
366 son->father = father;
367 son->login = father->login;
368 son->idle = father->idle;
369 son->scenario = father->scenario;
370 strcpy_len(father->optimizer, son->optimizer, sizeof(father->optimizer));
371 son->workerlimit = father->workerlimit;
372 son->memorylimit = father->memorylimit;
373 son->querytimeout = father->querytimeout;
374 son->sessiontimeout = father->sessiontimeout;
375
376 if (son->prompt)
377 GDKfree(son->prompt);
378 son->prompt = prompt;
379 son->promptlength = strlen(prompt);
380 /* reuse the scopes wherever possible */
381 if (son->usermodule == 0) {
382 son->usermodule = userModule();
383 if(son->usermodule == 0) {
384 MCcloseClient(son);
385 return NULL;
386 }
387 }
388 } else {
389 GDKfree(prompt);
390 }
391 return son;
392}
393
394/*
395 * When a client needs to be terminated then the file descriptors for
396 * its input/output are simply closed. This leads to a graceful
397 * degradation, but may take some time when the client is busy. A more
398 * forcefull method is to kill the client thread, but this may leave
399 * locks and semaphores in an undesirable state.
400 *
401 * The routine freeClient ends a single client session, but through side
402 * effects of sharing IO descriptors, also its children. Conversely, a
403 * child can not close a parent.
404 */
405void
406MCfreeClient(Client c)
407{
408 if( c->mode == FREECLIENT)
409 return;
410 c->mode = FINISHCLIENT;
411
412#ifdef MAL_CLIENT_DEBUG
413 fprintf(stderr,"# Free client %d\n", c->idx);
414#endif
415 MCexitClient(c);
416
417 /* scope list and curprg can not be removed, because the client may
418 * reside in a quit() command. Therefore the scopelist is re-used.
419 */
420 c->scenario = NULL;
421 if (c->prompt)
422 GDKfree(c->prompt);
423 c->prompt = NULL;
424 c->promptlength = -1;
425 if (c->errbuf) {
426/* no client threads in embedded mode */
427#ifndef HAVE_EMBEDDED
428 GDKsetbuf(0);
429#endif
430 if (c->father == NULL)
431 GDKfree(c->errbuf);
432 c->errbuf = 0;
433 }
434 if (c->usermodule)
435 freeModule(c->usermodule);
436 c->usermodule = c->curmodule = 0;
437 c->father = 0;
438 c->idle = c->login = c->lastcmd = 0;
439 strcpy_len(c->optimizer, "default_pipe", sizeof(c->optimizer));
440 c->workerlimit = 0;
441 c->memorylimit = 0;
442 c->querytimeout = 0;
443 c->sessiontimeout = 0;
444 c->user = oid_nil;
445 if( c->username){
446 GDKfree(c->username);
447 c->username = 0;
448 }
449 c->mythread = 0;
450 if (c->glb) {
451 freeStack(c->glb);
452 c->glb = NULL;
453 }
454 if( c->profticks){
455 BBPunfix(c->profticks->batCacheid);
456 BBPunfix(c->profstmt->batCacheid);
457 c->profticks = c->profstmt = NULL;
458 }
459 if( c->error_row){
460 BBPunfix(c->error_row->batCacheid);
461 BBPunfix(c->error_fld->batCacheid);
462 BBPunfix(c->error_msg->batCacheid);
463 BBPunfix(c->error_input->batCacheid);
464 c->error_row = c->error_fld = c->error_msg = c->error_input = NULL;
465 }
466 if( c->wlc)
467 freeMalBlk(c->wlc);
468 c->sqlprofiler = 0;
469 c->wlc_kind = 0;
470 c->wlc = NULL;
471 MT_sema_destroy(&c->s);
472 c->mode = MCshutdowninprogress()? BLOCKCLIENT: FREECLIENT;
473}
474
475/*
476 * If a client disappears from the scene (eof on stream), we should
477 * terminate all its children. This is in principle a forcefull action,
478 * because the children may be ignoring the primary IO streams.
479 * (Instead they may be blocked in an infinite loop)
480 *
481 * Special care should be taken by closing the 'adm' thread. It is
482 * permitted to leave only when it is the sole user of the system.
483 *
484 * Furthermore, once we enter closeClient, the process in which it is
485 * raised has already lost its file descriptors.
486 *
487 * When the server is about to shutdown, we should softly terminate
488 * all outstanding session.
489 */
490static volatile int shutdowninprogress = 0;
491
492int
493MCshutdowninprogress(void){
494 return shutdowninprogress;
495}
496
497void
498MCstopClients(Client cntxt)
499{
500 Client c = mal_clients;
501
502 MT_lock_set(&mal_contextLock);
503 for(c = mal_clients; c < mal_clients+MAL_MAXCLIENTS; c++)
504 if (cntxt != c){
505 if (c->mode == RUNCLIENT)
506 c->mode = FINISHCLIENT;
507 else if (c->mode == FREECLIENT)
508 c->mode = BLOCKCLIENT;
509 }
510 shutdowninprogress =1;
511 MT_lock_unset(&mal_contextLock);
512}
513
514int
515MCactiveClients(void)
516{
517 int idles = 0;
518 Client cntxt = mal_clients;
519
520 for(cntxt = mal_clients; cntxt<mal_clients+MAL_MAXCLIENTS; cntxt++){
521 idles += (cntxt->idle != 0 && cntxt->mode == RUNCLIENT);
522 }
523 return idles;
524}
525
526void
527MCcloseClient(Client c)
528{
529#ifdef MAL_DEBUG_CLIENT
530 fprintf(stderr,"closeClient %d " OIDFMT "\n", (int) (c - mal_clients), c->user);
531#endif
532 /* free resources of a single thread */
533 MCfreeClient(c);
534}
535
536str
537MCsuspendClient(int id)
538{
539 if (id < 0 || id >= MAL_MAXCLIENTS)
540 throw(INVCRED, "mal.clients", INVCRED_WRONG_ID);
541 mal_clients[id].itrace = 'S';
542 return MAL_SUCCEED;
543}
544
545str
546MCawakeClient(int id)
547{
548 if (id < 0 || id >= MAL_MAXCLIENTS)
549 throw(INVCRED, "mal.clients", INVCRED_WRONG_ID);
550 mal_clients[id].itrace = 0;
551 return MAL_SUCCEED;
552}
553
554/*
555 * Input to be processed is collected in a Client specific buffer. It
556 * is filled by reading information from a stream, a terminal, or by
557 * scheduling strings constructed internally. The latter involves
558 * removing any escape character needed to manipulate the string within
559 * the kernel. The buffer space is automatically expanded to
560 * accommodate new information and the read pointers are adjusted.
561 *
562 * The input is read from a (blocked) stream and stored in the client
563 * record input buffer. The storage area grows automatically upon need.
564 * The origin of the input stream depends on the connectivity mode.
565 *
566 * Each operation received from a front-end consists of at least one
567 * line. To simplify misaligned communication with front-ends, we use
568 * different prompts structures.
569 *
570 * The default action is to read information from an ascii-stream one
571 * line at a time. This is the preferred mode for reading from terminal.
572 *
573 * The next statement block is to be read. Send a prompt to warn the
574 * front-end to issue the request.
575 */
576int
577MCreadClient(Client c)
578{
579 bstream *in = c->fdin;
580
581#ifdef MAL_CLIENT_DEBUG
582 fprintf(stderr,"# streamClient %d %d\n", c->idx, isa_block_stream(in->s));
583#endif
584
585 while (in->pos < in->len &&
586 (isspace((unsigned char) (in->buf[in->pos])) ||
587 in->buf[in->pos] == ';' || !in->buf[in->pos]))
588 in->pos++;
589
590 if (in->pos >= in->len || in->mode) {
591 ssize_t rd, sum = 0;
592
593 if (in->eof || !isa_block_stream(c->fdout)) {
594 if (!isa_block_stream(c->fdout) && c->promptlength > 0)
595 mnstr_write(c->fdout, c->prompt, c->promptlength, 1);
596 mnstr_flush(c->fdout);
597 in->eof = false;
598 }
599 while ((rd = bstream_next(in)) > 0 && !in->eof) {
600 sum += rd;
601 if (!in->mode) /* read one line at a time in line mode */
602 break;
603 }
604 if (in->mode) { /* find last new line */
605 char *p = in->buf + in->len - 1;
606
607 while (p > in->buf && *p != '\n') {
608 *(p + 1) = *p;
609 p--;
610 }
611 if (p > in->buf)
612 *(p + 1) = 0;
613 if (p != in->buf + in->len - 1)
614 in->len++;
615 }
616#ifdef MAL_CLIENT_DEBUG
617 fprintf(stderr, "# simple stream received %d sum %zu\n", c->idx, sum);
618#endif
619 }
620 if (in->pos >= in->len) {
621 /* end of stream reached */
622#ifdef MAL_CLIENT_DEBUG
623 fprintf(stderr,"# end of stream received %d %d\n", c->idx, c->bak == 0);
624#endif
625 if (c->bak) {
626 MCpopClientInput(c);
627 if (c->fdin == NULL)
628 return 0;
629 return MCreadClient(c);
630 }
631 return 0;
632 }
633#ifdef MAL_CLIENT_DEBUG
634 fprintf(stderr,"# finished stream read %d %d\n", (int) in->pos, (int) in->len);
635 printf("#%s\n", in->buf);
636#endif
637 return 1;
638}
639
640
641int
642MCvalid(Client tc)
643{
644 Client c;
645 if (tc == NULL) {
646 return 0;
647 }
648 MT_lock_set(&mal_contextLock);
649 for (c = mal_clients; c < mal_clients + MAL_MAXCLIENTS; c++) {
650 if (c == tc && c->mode == RUNCLIENT) {
651 MT_lock_unset(&mal_contextLock);
652 return 1;
653 }
654 }
655 MT_lock_unset(&mal_contextLock);
656 return 0;
657}
658