1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/* (author) M.L. Kersten
10 */
11#include "monetdb_config.h"
12#include "mal_resource.h"
13#include "mal_private.h"
14
15/* Memory based admission does not seem to have a major impact so far. */
16static lng memorypool = 0; /* memory claimed by concurrent threads */
17
18void
19mal_resource_reset(void)
20{
21 memorypool = (lng) MEMORY_THRESHOLD;
22}
23/*
24 * Running all eligible instructions in parallel creates
25 * resource contention. This means we should implement
26 * an admission control scheme where threads are temporarily
27 * postponed if the claim for memory exceeds a threshold
28 * In general such contentions will be hard to predict,
29 * because they depend on the algorithm, the input sizes,
30 * concurrent use of the same variables, and the output produced.
31 *
32 * One heuristic is based on calculating the storage footprint
33 * of the operands and assuming it preferrably should fit in memory.
34 * Ofcourse, there may be intermediate structures being
35 * used and the size of the result is not a priori known.
36 * For this, we use a high watermark on the amount of
37 * physical memory we pre-allocate for the claims.
38 *
39 * Instructions are eligible to be executed when the
40 * total footprint of all concurrent executions stays below
41 * the high-watermark or it is the single expensive
42 * instruction being started.
43 *
44 * When we run out of memory, the instruction is delayed.
45 * How long depends on the other instructions to free up
46 * resources. The current policy simple takes a local
47 * decision by delaying the instruction based on its
48 * claim of the memory.
49 */
50
51/*
52 * The memory claim is the estimate for the amount of memory hold.
53 * Views are consider cheap and ignored.
54 * Given that auxiliary structures are important for performance,
55 * we use their maximum as an indication of the memory footprint.
56 * An alternative would be to focus solely on the base table cost.
57 * (Good for a MSc study)
58 */
59lng
60getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag)
61{
62 lng total = 0, itotal = 0, t;
63 BAT *b;
64
65 (void)mb;
66 if (stk->stk[getArg(pci, i)].vtype == TYPE_bat) {
67 b = BATdescriptor( stk->stk[getArg(pci, i)].val.bval);
68 if (b == NULL)
69 return 0;
70 if (flag && isVIEW(b)) {
71 BBPunfix(b->batCacheid);
72 return 0;
73 }
74
75 /* calculate the basic scan size */
76 total += BATcount(b) * b->twidth;
77 total += heapinfo(b->tvheap, b->batCacheid);
78
79 /* indices should help, find their maximum footprint */
80 itotal = hashinfo(b->thash, d->batCacheid);
81 t = IMPSimprintsize(b);
82 if( t > itotal)
83 itotal = t;
84 /* We should also consider the ordered index and mosaic */
85 //total = total > (lng)(MEMORY_THRESHOLD ) ? (lng)(MEMORY_THRESHOLD ) : total;
86 BBPunfix(b->batCacheid);
87 if ( total < itotal)
88 total = itotal;
89 }
90 return total;
91}
92
93/*
94 * The argclaim provides a hint on how much we actually may need to execute
95 *
96 * The client context also keeps bounds on the memory claim/client.
97 * Surpassing this bound may be a reason to not admit the instruction to proceed.
98 */
99static MT_Lock admissionLock = MT_LOCK_INITIALIZER("admissionLock");
100
101int
102MALadmission_claim(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, lng argclaim)
103{
104 (void) mb;
105 (void) pci;
106 if (argclaim == 0)
107 return 0;
108
109 MT_lock_set(&admissionLock);
110 /* Check if we are allowed to allocate another worker thread for this client */
111 /* It is somewhat tricky, because we may be in a dataflow recursion, each of which should be counted for.
112 * A way out is to attach the thread count to the MAL stacks, which just limits the level
113 * of parallism for a single dataflow graph.
114 */
115 if(cntxt->workerlimit && cntxt->workerlimit < stk->workers){
116 PARDEBUG
117 fprintf(stderr, "#DFLOWadmit worker limit reached, %d <= %d\n", cntxt->workerlimit, stk->workers);
118 MT_lock_unset(&admissionLock);
119 return -1;
120 }
121 /* Determine if the total memory resource is exhausted, because it is overall limitation. */
122 if ( memorypool <= 0){
123 // we accidently released too much memory or need to initialize
124 PARDEBUG
125 fprintf(stderr, "#DFLOWadmit memorypool reset ");
126 memorypool = (lng) MEMORY_THRESHOLD;
127 }
128
129 /* the argument claim is based on the input for an instruction */
130 if ( memorypool > argclaim || stk->workers == 0 ) {
131 /* If we are low on memory resources, limit the user if he exceeds his memory budget
132 * but make sure there is at least one worker thread active */
133 /* on hold until after experiments
134 if ( 0 && cntxt->memorylimit) {
135 if (argclaim + stk->memory > (lng) cntxt->memorylimit * LL_CONSTANT(1048576)){
136 MT_lock_unset(&admissionLock);
137 PARDEBUG
138 fprintf(stderr, "#Delayed due to lack of session memory " LLFMT " requested "LLFMT"\n",
139 stk->memory, argclaim);
140 return -1;
141 }
142 stk->memory += argclaim;
143 }
144 */
145 memorypool -= argclaim;
146 PARDEBUG
147 fprintf(stderr, "#DFLOWadmit thread %d pool " LLFMT "claims " LLFMT "\n",
148 THRgettid(), memorypool, argclaim);
149 stk->workers++;
150 MT_lock_unset(&admissionLock);
151 return 0;
152 }
153 PARDEBUG
154 fprintf(stderr, "#Delayed due to lack of memory " LLFMT " requested " LLFMT "\n",
155 memorypool, argclaim);
156 MT_lock_unset(&admissionLock);
157 return -1;
158}
159
160void
161MALadmission_release(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, lng argclaim)
162{
163 /* release memory claimed before */
164 (void) cntxt;
165 (void) mb;
166 (void) pci;
167 if (argclaim == 0 )
168 return;
169
170 MT_lock_set(&admissionLock);
171 /* on hold until after experiments
172 if ( 0 && cntxt->memorylimit) {
173 PARDEBUG
174 fprintf(stderr, "#Return memory to session budget " LLFMT "\n", stk->memory);
175 stk->memory -= argclaim;
176 }
177 */
178 memorypool += argclaim;
179 if ( memorypool > (lng) MEMORY_THRESHOLD ){
180 PARDEBUG
181 fprintf(stderr, "#DFLOWadmit memorypool reset ");
182 memorypool = (lng) MEMORY_THRESHOLD;
183 }
184 stk->workers--;
185 PARDEBUG
186 fprintf(stderr, "#DFLOWadmit thread %d pool " LLFMT " claims " LLFMT "\n",
187 THRgettid(), memorypool, argclaim);
188 MT_lock_unset(&admissionLock);
189 return;
190}
191