1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9#include "monetdb_config.h"
10#include "sysmon.h"
11#include "mal_authorize.h"
12#include "mal_runtime.h"
13#include "mtime.h"
14
15/* (c) M.L. Kersten
16 * The queries currently in execution are returned to the front-end for managing expensive ones.
17*/
18
19str
20SYSMONqueue(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
21{
22 BAT *tag, *sessionid, *user, *started, *status, *query, *progress, *workers, *memory;
23 bat *t = getArgReference_bat(stk,pci,0);
24 bat *s = getArgReference_bat(stk,pci,1);
25 bat *u = getArgReference_bat(stk,pci,2);
26 bat *sd = getArgReference_bat(stk,pci,3);
27 bat *ss = getArgReference_bat(stk,pci,4);
28 bat *q = getArgReference_bat(stk,pci,5);
29 bat *p = getArgReference_bat(stk,pci,6);
30 bat *w = getArgReference_bat(stk,pci,7);
31 bat *m = getArgReference_bat(stk,pci,8);
32 lng i, qtag;
33 int wrk, mem;
34 str usr;
35 timestamp tsn;
36 str msg = MAL_SUCCEED;
37
38 (void) cntxt;
39 (void) mb;
40 tag = COLnew(0, TYPE_lng, 256, TRANSIENT);
41 sessionid = COLnew(0, TYPE_int, 256, TRANSIENT);
42 user = COLnew(0, TYPE_str, 256, TRANSIENT);
43 started = COLnew(0, TYPE_timestamp, 256, TRANSIENT);
44 status = COLnew(0, TYPE_str, 256, TRANSIENT);
45 query = COLnew(0, TYPE_str, 256, TRANSIENT);
46 progress = COLnew(0, TYPE_int, 256, TRANSIENT);
47 workers = COLnew(0, TYPE_int, 256, TRANSIENT);
48 memory = COLnew(0, TYPE_int, 256, TRANSIENT);
49 if ( tag == NULL || sessionid == NULL || user == NULL || query == NULL || started == NULL || progress == NULL || workers == NULL || memory == NULL){
50 BBPreclaim(tag);
51 BBPreclaim(sessionid);
52 BBPreclaim(user);
53 BBPreclaim(started);
54 BBPreclaim(status);
55 BBPreclaim(query);
56 BBPreclaim(progress);
57 BBPreclaim(workers);
58 BBPreclaim(memory);
59 throw(MAL, "SYSMONqueue", SQLSTATE(HY001) MAL_MALLOC_FAIL);
60 }
61
62 MT_lock_set(&mal_delayLock);
63 for ( i = 0; i< qtop; i++)
64 if( QRYqueue[i].query && (QRYqueue[i].cntxt->user == MAL_ADMIN || QRYqueue[i].cntxt->user == cntxt->user)) {
65 qtag = (lng) QRYqueue[i].tag;
66 if (BUNappend(tag, &qtag, false) != GDK_SUCCEED)
67 goto bailout;
68 msg = AUTHgetUsername(&usr, QRYqueue[i].cntxt);
69 if (msg != MAL_SUCCEED)
70 goto bailout;
71
72 if (BUNappend(sessionid, &(QRYqueue[i].cntxt->idx), false) != GDK_SUCCEED) {
73 GDKfree(usr);
74 goto bailout;
75 }
76
77 if (BUNappend(user, usr, false) != GDK_SUCCEED) {
78 GDKfree(usr);
79 goto bailout;
80 }
81 GDKfree(usr);
82 if (BUNappend(query, QRYqueue[i].query, false) != GDK_SUCCEED ||
83 BUNappend(status, QRYqueue[i].status, false) != GDK_SUCCEED)
84 goto bailout;
85
86 /* convert number of seconds into a timestamp */
87 tsn = timestamp_fromtime(QRYqueue[i].start);
88 if (is_timestamp_nil(tsn)) {
89 msg = createException(MAL, "SYSMONqueue", SQLSTATE(22003) "cannot convert time");
90 goto bailout;
91 }
92 if (BUNappend(started, &tsn, false) != GDK_SUCCEED)
93 goto bailout;
94
95 wrk = QRYqueue[i].stk->workers;
96 mem = (int) (QRYqueue[i].stk->memory / LL_CONSTANT(1048576)); /* Convert to MB */
97 if (BUNappend(progress, &QRYqueue[i].progress, false) != GDK_SUCCEED ||
98 BUNappend(workers, &wrk, false) != GDK_SUCCEED ||
99 BUNappend(memory, &mem, false) != GDK_SUCCEED)
100 goto bailout;
101 }
102 MT_lock_unset(&mal_delayLock);
103 BBPkeepref( *t =tag->batCacheid);
104 BBPkeepref( *s =sessionid->batCacheid);
105 BBPkeepref( *u =user->batCacheid);
106 BBPkeepref( *sd =started->batCacheid);
107 BBPkeepref( *ss =status->batCacheid);
108 BBPkeepref( *q =query->batCacheid);
109 BBPkeepref( *p =progress->batCacheid);
110 BBPkeepref( *w =workers->batCacheid);
111 BBPkeepref( *m =memory->batCacheid);
112 return MAL_SUCCEED;
113
114 bailout:
115 MT_lock_unset(&mal_delayLock);
116 BBPunfix(tag->batCacheid);
117 BBPunfix(sessionid->batCacheid);
118 BBPunfix(user->batCacheid);
119 BBPunfix(started->batCacheid);
120 BBPunfix(status->batCacheid);
121 BBPunfix(query->batCacheid);
122 BBPunfix(progress->batCacheid);
123 BBPunfix(workers->batCacheid);
124 BBPunfix(memory->batCacheid);
125 return msg ? msg : createException(MAL, "SYSMONqueue", SQLSTATE(HY001) MAL_MALLOC_FAIL);
126}
127
128str
129SYSMONpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
130{ lng i, tag = 0;
131 (void) mb;
132 (void) stk;
133 (void) pci;
134
135 switch( getArgType(mb,pci,1)){
136 case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
137 case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
138 case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
139 case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
140#ifdef HAVE_HGE
141 case TYPE_hge:
142 /* Does this happen?
143 * If so, what do we have TODO ? */
144 throw(MAL, "SYSMONpause", "type hge not handled, yet");
145#endif
146 default:
147 throw(MAL, "SYSMONpause", "Pause requires integer");
148 }
149 MT_lock_set(&mal_delayLock);
150 for ( i = 0; QRYqueue[i].tag; i++)
151 if( (lng) QRYqueue[i].tag == tag && (QRYqueue[i].cntxt->user == cntxt->user || cntxt->user == MAL_ADMIN)){
152 QRYqueue[i].stk->status = 'p';
153 QRYqueue[i].status = "paused";
154 }
155 MT_lock_unset(&mal_delayLock);
156 return MAL_SUCCEED;
157}
158
159str
160SYSMONresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
161{ lng i,tag = 0;
162 (void) mb;
163 (void) stk;
164 (void) pci;
165
166 switch( getArgType(mb,pci,1)){
167 case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
168 case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
169 case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
170 case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
171#ifdef HAVE_HGE
172 case TYPE_hge:
173 /* Does this happen?
174 * If so, what do we have TODO ? */
175 throw(MAL, "SYSMONresume", "type hge not handled, yet");
176#endif
177 default:
178 throw(MAL, "SYSMONresume", "Resume requires integer");
179 }
180 MT_lock_set(&mal_delayLock);
181 for ( i = 0; QRYqueue[i].tag; i++)
182 if( (lng)QRYqueue[i].tag == tag && (QRYqueue[i].cntxt->user == cntxt->user || cntxt->user == MAL_ADMIN)){
183 QRYqueue[i].stk->status = 0;
184 QRYqueue[i].status = "running";
185 }
186 MT_lock_unset(&mal_delayLock);
187 return MAL_SUCCEED;
188}
189
190str
191SYSMONstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
192{ lng i,tag = 0;
193 (void) mb;
194 (void) stk;
195 (void) pci;
196
197 switch( getArgType(mb,pci,1)){
198 case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
199 case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
200 case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
201 case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
202#ifdef HAVE_HGE
203 case TYPE_hge:
204 /* Does this happen?
205 * If so, what do we have TODO ? */
206 throw(MAL, "SYSMONstop", "type hge not handled, yet");
207#endif
208 default:
209 throw(MAL, "SYSMONstop", "Stop requires integer");
210 }
211 MT_lock_set(&mal_delayLock);
212 for ( i = 0; QRYqueue[i].tag; i++)
213 if( (lng) QRYqueue[i].tag == tag && (QRYqueue[i].cntxt->user == cntxt->user || cntxt->user == MAL_ADMIN)){
214 QRYqueue[i].stk->status = 'q';
215 QRYqueue[i].status = "stopping";
216 }
217 MT_lock_unset(&mal_delayLock);
218 return MAL_SUCCEED;
219}
220