1/*
2 * This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 *
6 * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
7 */
8
9/*
10 * (author) M Kersten
11 * Out of order execution
12 * The alternative is to execute the instructions out of order
13 * using dataflow dependencies and as an independent process.
14 * Dataflow processing only works on a code
15 * sequence that does not include additional (implicit) flow of control
16 * statements and, ideally, consist of expensive BAT operations.
17 * The dataflow interpreter selects cheap instructions
18 * using a simple costfunction based on the size of the BATs involved.
19 *
20 * The dataflow portion is identified as a guarded block,
21 * whose entry is controlled by the function language.dataflow();
22 * This way the function can inform the caller to skip the block
23 * when dataflow execution was performed.
24 *
25 * The flow graphs should be organized such that parallel threads can
26 * access it mostly without expensive locking.
27 */
28#include "monetdb_config.h"
29#include "mal_dataflow.h"
30#include "mal_private.h"
31#include "mal_runtime.h"
32#include "mal_resource.h"
33
34#define DFLOWpending 0 /* runnable */
35#define DFLOWrunning 1 /* currently in progress */
36#define DFLOWwrapup 2 /* done! */
37#define DFLOWretry 3 /* reschedule */
38#define DFLOWskipped 4 /* due to errors */
39
40/* The per instruction status of execution */
41typedef struct FLOWEVENT {
42 struct DATAFLOW *flow;/* execution context */
43 int pc; /* pc in underlying malblock */
44 int blocks; /* awaiting for variables */
45 sht state; /* of execution */
46 lng clk;
47 sht cost;
48 lng hotclaim; /* memory foot print of result variables */
49 lng argclaim; /* memory foot print of arguments */
50 lng maxclaim; /* memory foot print of largest argument, counld be used to indicate result size */
51} *FlowEvent, FlowEventRec;
52
53typedef struct queue {
54 int size; /* size of queue */
55 int last; /* last element in the queue */
56 int exitcount; /* how many threads should exit */
57 FlowEvent *data;
58 MT_Lock l; /* it's a shared resource, ie we need locks */
59 MT_Sema s; /* threads wait on empty queues */
60} Queue;
61
62/*
63 * The dataflow dependency is administered in a graph list structure.
64 * For each instruction we keep the list of instructions that
65 * should be checked for eligibility once we are finished with it.
66 */
67typedef struct DATAFLOW {
68 Client cntxt; /* for debugging and client resolution */
69 MalBlkPtr mb; /* carry the context */
70 MalStkPtr stk;
71 int start, stop; /* guarded block under consideration*/
72 FlowEvent status; /* status of each instruction */
73 ATOMIC_PTR_TYPE error; /* error encountered */
74 int *nodes; /* dependency graph nodes */
75 int *edges; /* dependency graph */
76 MT_Lock flowlock; /* lock to protect the above */
77 Queue *done; /* instructions handled */
78} *DataFlow, DataFlowRec;
79
80static struct worker {
81 MT_Id id;
82 enum {IDLE, RUNNING, JOINING, EXITED} flag;
83 ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */
84 MT_Sema s;
85} workers[THREADS];
86
87static Queue *todo = 0; /* pending instructions */
88
89static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
90static MT_Lock dataflowLock = MT_LOCK_INITIALIZER("dataflowLock");
91static void stopMALdataflow(void);
92
93void
94mal_dataflow_reset(void)
95{
96 stopMALdataflow();
97 memset((char*) workers, 0, sizeof(workers));
98 if( todo) {
99 GDKfree(todo->data);
100 MT_lock_destroy(&todo->l);
101 MT_sema_destroy(&todo->s);
102 GDKfree(todo);
103 }
104 todo = 0; /* pending instructions */
105 ATOMIC_SET(&exiting, 0);
106}
107
108/*
109 * Calculate the size of the dataflow dependency graph.
110 */
111static int
112DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
113{
114 int cnt = 0;
115 int i;
116
117 for (i = start; i < stop; i++)
118 cnt += getInstrPtr(mb, i)->argc;
119 return cnt;
120}
121
122/*
123 * The dataflow execution is confined to a barrier block.
124 * Within the block there are multiple flows, which, in principle,
125 * can be executed in parallel.
126 */
127
128static Queue*
129q_create(int sz, const char *name)
130{
131 Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
132
133 if (q == NULL)
134 return NULL;
135 *q = (Queue) {
136 .size = ((sz << 1) >> 1), /* we want a multiple of 2 */
137 };
138 q->data = (FlowEvent*) GDKmalloc(sizeof(FlowEvent) * q->size);
139 if (q->data == NULL) {
140 GDKfree(q);
141 return NULL;
142 }
143
144 MT_lock_init(&q->l, name);
145 MT_sema_init(&q->s, 0, name);
146 return q;
147}
148
149static void
150q_destroy(Queue *q)
151{
152 assert(q);
153 MT_lock_destroy(&q->l);
154 MT_sema_destroy(&q->s);
155 GDKfree(q->data);
156 GDKfree(q);
157}
158
159/* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */
160/* we might actually sort it for better scheduling behavior */
161static void
162q_enqueue_(Queue *q, FlowEvent d)
163{
164 assert(q);
165 assert(d);
166 if (q->last == q->size) {
167 q->size <<= 1;
168 q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * q->size);
169 assert(q->data);
170 }
171 q->data[q->last++] = d;
172}
173static void
174q_enqueue(Queue *q, FlowEvent d)
175{
176 assert(q);
177 assert(d);
178 MT_lock_set(&q->l);
179 q_enqueue_(q, d);
180 MT_lock_unset(&q->l);
181 MT_sema_up(&q->s);
182}
183
184/*
185 * A priority queue over the hot claims of memory may
186 * be more effective. It priorizes those instructions
187 * that want to use a big recent result
188 */
189
190static void
191q_requeue_(Queue *q, FlowEvent d)
192{
193 int i;
194
195 assert(q);
196 assert(d);
197 if (q->last == q->size) {
198 /* enlarge buffer */
199 q->size <<= 1;
200 q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * q->size);
201 assert(q->data);
202 }
203 for (i = q->last; i > 0; i--)
204 q->data[i] = q->data[i - 1];
205 q->data[0] = d;
206 q->last++;
207}
208static void
209q_requeue(Queue *q, FlowEvent d)
210{
211 assert(q);
212 assert(d);
213 MT_lock_set(&q->l);
214 q_requeue_(q, d);
215 MT_lock_unset(&q->l);
216 MT_sema_up(&q->s);
217}
218
219static FlowEvent
220q_dequeue(Queue *q, Client cntxt)
221{
222 FlowEvent r = NULL, s = NULL;
223 //int i;
224
225 assert(q);
226 MT_sema_down(&q->s);
227 if (ATOMIC_GET(&exiting))
228 return NULL;
229 MT_lock_set(&q->l);
230 if (cntxt) {
231 int i, minpc = -1;
232
233 for (i = q->last - 1; i >= 0; i--) {
234 if (q->data[i]->flow->cntxt == cntxt) {
235 if (q->last > 1024) {
236 /* for long "queues", just grab the first eligible
237 * entry we encounter */
238 minpc = i;
239 break;
240 }
241 /* for shorter "queues", find the oldest eligible entry */
242 if (minpc < 0) {
243 minpc = i;
244 s = q->data[i];
245 }
246 r = q->data[i];
247 if (s && r && s->pc > r->pc) {
248 minpc = i;
249 s = r;
250 }
251 }
252 }
253 if (minpc >= 0) {
254 r = q->data[minpc];
255 i = minpc;
256 q->last--;
257 memmove(q->data + i, q->data + i + 1, (q->last - i) * sizeof(q->data[0]));
258 }
259
260 MT_lock_unset(&q->l);
261 return r;
262 }
263 if (q->exitcount > 0) {
264 q->exitcount--;
265 MT_lock_unset(&q->l);
266 return NULL;
267 }
268 assert(q->last > 0);
269 if (q->last > 0) {
270 /* LIFO favors garbage collection */
271 r = q->data[--q->last];
272/* Line coverage test shows it is an expensive loop that is hardly ever leads to adjustment
273 for(i= q->last-1; r && i>=0; i--){
274 s= q->data[i];
275 if( s && s->flow && s->flow->stk &&
276 r && r->flow && r->flow->stk &&
277 s->flow->stk->tag < r->flow->stk->tag){
278 q->data[i]= r;
279 r = s;
280 }
281 }
282*/
283 q->data[q->last] = 0;
284 }
285 /* else: terminating */
286 /* try out random draw *
287 {
288 int i;
289 i = rand() % q->last;
290 r = q->data[i];
291 for (i++; i < q->last; i++)
292 q->data[i - 1] = q->data[i];
293 q->last--; i
294 }
295 */
296
297 MT_lock_unset(&q->l);
298 assert(r);
299 return r;
300}
301
302/*
303 * We simply move an instruction into the front of the queue.
304 * Beware, we assume that variables are assigned a value once, otherwise
305 * the order may really create errors.
306 * The order of the instructions should be retained as long as possible.
307 * Delay processing when we run out of memory. Push the instruction back
308 * on the end of queue, waiting for another attempt. Problem might become
309 * that all threads but one are cycling through the queue, each time
310 * finding an eligible instruction, but without enough space.
311 * Therefore, we wait for a few milliseconds as an initial punishment.
312 *
313 * The process could be refined by checking for cheap operations,
314 * i.e. those that would require no memory at all (aggr.count)
315 * This, however, would lead to a dependency to the upper layers,
316 * because in the kernel we don't know what routines are available
317 * with this property. Nor do we maintain such properties.
318 */
319
320static void
321DFLOWworker(void *T)
322{
323 struct worker *t = (struct worker *) T;
324 DataFlow flow;
325 FlowEvent fe = 0, fnxt = 0;
326 int id = (int) (t - workers);
327 int tid = THRgettid();
328 str error = 0;
329 int i,last;
330 lng claim;
331 Client cntxt;
332 InstrPtr p;
333
334#ifdef _MSC_VER
335 srand((unsigned int) GDKusec());
336#endif
337 GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
338 if( GDKerrbuf == 0)
339 fprintf(stderr,"DFLOWworker:Could not allocate GDKerrbuf\n");
340 else
341 GDKclrerr();
342 cntxt = ATOMIC_PTR_GET(&t->cntxt);
343 if (cntxt) {
344 /* wait until we are allowed to start working */
345 MT_sema_down(&t->s);
346 }
347 while (1) {
348 if (fnxt == 0) {
349 MT_thread_setworking(NULL);
350 cntxt = ATOMIC_PTR_GET(&t->cntxt);
351 fe = q_dequeue(todo, cntxt);
352 if (fe == NULL) {
353 if (cntxt) {
354 /* we're not done yet with work for the current
355 * client (as far as we know), so give up the CPU
356 * and let the scheduler enter some more work, but
357 * first compensate for the down we did in
358 * dequeue */
359 MT_sema_up(&todo->s);
360 MT_sleep_ms(1);
361 continue;
362 }
363 /* no more work to be done: exit */
364 break;
365 }
366 if (fe->flow->cntxt && fe->flow->cntxt->mythread)
367 MT_thread_setworking(fe->flow->cntxt->mythread->name);
368 } else
369 fe = fnxt;
370 if (ATOMIC_GET(&exiting)) {
371 break;
372 }
373 fnxt = 0;
374 assert(fe);
375 flow = fe->flow;
376 assert(flow);
377
378 /* whenever we have a (concurrent) error, skip it */
379 if (ATOMIC_PTR_GET(&flow->error)) {
380 q_enqueue(flow->done, fe);
381 continue;
382 }
383
384 p= getInstrPtr(flow->mb,fe->pc);
385 claim = fe->argclaim;
386 if (MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, claim)) {
387 // never block on deblockdataflow()
388 if( p->fcn != (MALfcn) deblockdataflow){
389 fe->hotclaim = 0; /* don't assume priority anymore */
390 fe->maxclaim = 0;
391 if (todo->last == 0)
392 MT_sleep_ms(DELAYUNIT);
393 q_requeue(todo, fe);
394 continue;
395 }
396 }
397 error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0);
398 PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT "," LLFMT " %s\n",
399 fe->pc, id, claim, fe->hotclaim, fe->maxclaim, error ? error : "");
400 /* release the memory claim */
401 MALadmission_release(flow->cntxt, flow->mb, flow->stk, p, claim);
402 /* update the numa information. keep the thread-id producing the value */
403 p= getInstrPtr(flow->mb,fe->pc);
404 for( i = 0; i < p->argc; i++)
405 setVarWorker(flow->mb,getArg(p,i),tid);
406
407 MT_lock_set(&flow->flowlock);
408 fe->state = DFLOWwrapup;
409 MT_lock_unset(&flow->flowlock);
410 if (error) {
411 void *null = NULL;
412 /* only collect one error (from one thread, needed for stable testing) */
413 if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
414 freeException(error);
415 /* after an error we skip the rest of the block */
416 q_enqueue(flow->done, fe);
417 continue;
418 }
419
420 /* see if you can find an eligible instruction that uses the
421 * result just produced. Then we can continue with it right away.
422 * We are just looking forward for the last block, which means we
423 * are safe from concurrent actions. No other thread can steal it,
424 * because we hold the logical lock.
425 * All eligible instructions are queued
426 */
427 {
428 InstrPtr p = getInstrPtr(flow->mb, fe->pc);
429 assert(p);
430 fe->hotclaim = 0;
431 fe->maxclaim = 0;
432
433 for (i = 0; i < p->retc; i++){
434 lng footprint;
435 footprint = getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
436 fe->hotclaim += footprint;
437 if( footprint > fe->maxclaim) fe->maxclaim = footprint;
438 }
439 }
440 MT_lock_set(&flow->flowlock);
441
442 for (last = fe->pc - flow->start; last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last])
443 if (flow->status[i].state == DFLOWpending &&
444 flow->status[i].blocks == 1) {
445 flow->status[i].state = DFLOWrunning;
446 flow->status[i].blocks = 0;
447 flow->status[i].hotclaim = fe->hotclaim;
448 flow->status[i].argclaim += fe->hotclaim;
449 if( flow->status[i].maxclaim < fe->maxclaim)
450 flow->status[i].maxclaim = fe->maxclaim;
451 fnxt = flow->status + i;
452 break;
453 }
454 MT_lock_unset(&flow->flowlock);
455
456 q_enqueue(flow->done, fe);
457 if ( fnxt == 0 && malProfileMode) {
458 int last;
459 MT_lock_set(&todo->l);
460 last = todo->last;
461 MT_lock_unset(&todo->l);
462 if (last == 0)
463 profilerHeartbeatEvent("wait");
464 }
465 }
466 GDKfree(GDKerrbuf);
467 GDKsetbuf(0);
468 MT_lock_set(&dataflowLock);
469 t->flag = EXITED;
470 MT_lock_unset(&dataflowLock);
471}
472
473/*
474 * Create an interpreter pool.
475 * One worker will adaptively be available for each client.
476 * The remainder are taken from the GDKnr_threads argument and
477 * typically is equal to the number of cores
478 * The workers are assembled in a local table to enable debugging.
479 */
480static int
481DFLOWinitialize(void)
482{
483 int i, limit;
484 int created = 0;
485 static bool first = true;
486
487 MT_lock_set(&mal_contextLock);
488 if (todo) {
489 /* somebody else beat us to it */
490 MT_lock_unset(&mal_contextLock);
491 return 0;
492 }
493 todo = q_create(2048, "todo");
494 if (todo == NULL) {
495 MT_lock_unset(&mal_contextLock);
496 return -1;
497 }
498 for (i = 0; i < THREADS; i++) {
499 char name[16];
500 snprintf(name, sizeof(name), "DFLOWsema%d", i);
501 MT_sema_init(&workers[i].s, 0, name);
502 workers[i].flag = IDLE;
503 if (first) /* only initialize once */
504 ATOMIC_PTR_INIT(&workers[i].cntxt, NULL);
505 }
506 first = false;
507 limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
508 if (limit > THREADS)
509 limit = THREADS;
510 MT_lock_set(&dataflowLock);
511 for (i = 0; i < limit; i++) {
512 workers[i].flag = RUNNING;
513 ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
514 char name[16];
515 snprintf(name, sizeof(name), "DFLOWworker%d", i);
516 if ((workers[i].id = THRcreate(DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE, name)) == 0)
517 workers[i].flag = IDLE;
518 else
519 created++;
520 }
521 MT_lock_unset(&dataflowLock);
522 if (created == 0) {
523 /* no threads created */
524 q_destroy(todo);
525 todo = NULL;
526 MT_lock_unset(&mal_contextLock);
527 return -1;
528 }
529 MT_lock_unset(&mal_contextLock);
530 return 0;
531}
532
533/*
534 * The dataflow administration is based on administration of
535 * how many variables are still missing before it can be executed.
536 * For each instruction we keep a list of instructions whose
537 * blocking counter should be decremented upon finishing it.
538 */
539static str
540DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
541{
542 int pc, i, j, k, l, n, etop = 0;
543 int *assign;
544 InstrPtr p;
545
546 if (flow == NULL)
547 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
548 if (mb == NULL)
549 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
550 PARDEBUG fprintf(stderr, "#Initialize dflow block\n");
551 assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
552 if (assign == NULL)
553 throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
554 etop = flow->stop - flow->start;
555 for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
556 p = getInstrPtr(mb, pc);
557 if (p == NULL) {
558 GDKfree(assign);
559 throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned NULL");
560 }
561
562 /* initial state, ie everything can run */
563 flow->status[n].flow = flow;
564 flow->status[n].pc = pc;
565 flow->status[n].state = DFLOWpending;
566 flow->status[n].cost = -1;
567 ATOMIC_PTR_SET(&flow->status[n].flow->error, NULL);
568
569 /* administer flow dependencies */
570 for (j = p->retc; j < p->argc; j++) {
571 /* list of instructions that wake n-th instruction up */
572 if (!isVarConstant(mb, getArg(p, j)) && (k = assign[getArg(p, j)])) {
573 assert(k < pc); /* only dependencies on earlier instructions */
574 /* add edge to the target instruction for wakeup call */
575 k -= flow->start;
576 if (flow->nodes[k]) {
577 /* add wakeup to tail of list */
578 for (i = k; flow->edges[i] > 0; i = flow->edges[i])
579 ;
580 flow->nodes[etop] = n;
581 flow->edges[etop] = -1;
582 flow->edges[i] = etop;
583 etop++;
584 (void) size;
585 if( etop == size){
586 int *tmp;
587 /* in case of realloc failure, the original
588 * pointers will be freed by the caller */
589 tmp = (int*) GDKrealloc(flow->nodes, sizeof(int) * 2 * size);
590 if (tmp == NULL) {
591 GDKfree(assign);
592 throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
593 }
594 flow->nodes = tmp;
595 tmp = (int*) GDKrealloc(flow->edges, sizeof(int) * 2 * size);
596 if (tmp == NULL) {
597 GDKfree(assign);
598 throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
599 }
600 flow->edges = tmp;
601 size *=2;
602 }
603 } else {
604 flow->nodes[k] = n;
605 flow->edges[k] = -1;
606 }
607
608 flow->status[n].blocks++;
609 }
610
611 /* list of instructions to be woken up explicitly */
612 if (!isVarConstant(mb, getArg(p, j))) {
613 /* be careful, watch out for garbage collection interference */
614 /* those should be scheduled after all its other uses */
615 l = getEndScope(mb, getArg(p, j));
616 if (l != pc && l < flow->stop && l > flow->start) {
617 /* add edge to the target instruction for wakeup call */
618 PARDEBUG fprintf(stderr, "#endoflife for %s is %d -> %d\n", getVarName(mb, getArg(p, j)), n + flow->start, l);
619 assert(pc < l); /* only dependencies on earlier instructions */
620 l -= flow->start;
621 if (flow->nodes[n]) {
622 /* add wakeup to tail of list */
623 for (i = n; flow->edges[i] > 0; i = flow->edges[i])
624 ;
625 flow->nodes[etop] = l;
626 flow->edges[etop] = -1;
627 flow->edges[i] = etop;
628 etop++;
629 if( etop == size){
630 int *tmp;
631 /* in case of realloc failure, the original
632 * pointers will be freed by the caller */
633 tmp = (int*) GDKrealloc(flow->nodes, sizeof(int) * 2 * size);
634 if (tmp == NULL) {
635 GDKfree(assign);
636 throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
637 }
638 flow->nodes = tmp;
639 tmp = (int*) GDKrealloc(flow->edges, sizeof(int) * 2 * size);
640 if (tmp == NULL) {
641 GDKfree(assign);
642 throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
643 }
644 flow->edges = tmp;
645 size *=2;
646 }
647 } else {
648 flow->nodes[n] = l;
649 flow->edges[n] = -1;
650 }
651 flow->status[l].blocks++;
652 }
653 }
654 }
655
656 for (j = 0; j < p->retc; j++)
657 assign[getArg(p, j)] = pc; /* ensure recognition of dependency on first instruction and constant */
658 }
659 GDKfree(assign);
660 PARDEBUG {
661 for (n = 0; n < flow->stop - flow->start; n++) {
662 fprintf(stderr, "#[%d] %d: ", flow->start + n, n);
663 fprintInstruction(stderr, mb, 0, getInstrPtr(mb, n + flow->start), LIST_MAL_ALL);
664 fprintf(stderr, "#[%d]Dependents block count %d wakeup", flow->start + n, flow->status[n].blocks);
665 for (j = n; flow->edges[j]; j = flow->edges[j]) {
666 fprintf(stderr, "%d ", flow->start + flow->nodes[j]);
667 if (flow->edges[j] == -1)
668 break;
669 }
670 fprintf(stderr, "\n");
671 }
672 }
673 return MAL_SUCCEED;
674}
675
676/*
677 * Parallel processing is mostly driven by dataflow, but within this context
678 * there may be different schemes to take instructions into execution.
679 * The admission scheme (and wrapup) are the necessary scheduler hooks.
680 * A scheduler registers the functions needed and should release them
681 * at the end of the parallel block.
682 * They take effect after we have ensured that the basic properties for
683 * execution hold.
684 */
685/*
686static void showFlowEvent(DataFlow flow, int pc)
687{
688 int i;
689 FlowEvent fe = flow->status;
690
691 fprintf(stderr, "#end of data flow %d done %d \n", pc, flow->stop - flow->start);
692 for (i = 0; i < flow->stop - flow->start; i++)
693 if (fe[i].state != DFLOWwrapup && fe[i].pc >= 0) {
694 fprintf(stderr, "#missed pc %d status %d %d blocks %d", fe[i].state, i, fe[i].pc, fe[i].blocks);
695 fprintInstruction(stderr, fe[i].flow->mb, 0, getInstrPtr(fe[i].flow->mb, fe[i].pc), LIST_MAL_MAPI);
696 }
697}
698*/
699
700static str
701DFLOWscheduler(DataFlow flow, struct worker *w)
702{
703 int last;
704 int i;
705 int j;
706 InstrPtr p;
707 int tasks=0, actions;
708 str ret = MAL_SUCCEED;
709 FlowEvent fe, f = 0;
710
711 if (flow == NULL)
712 throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
713 actions = flow->stop - flow->start;
714 if (actions == 0)
715 throw(MAL, "dataflow", "Empty dataflow block");
716 /* initialize the eligible statements */
717 fe = flow->status;
718
719 MT_lock_set(&flow->flowlock);
720 for (i = 0; i < actions; i++)
721 if (fe[i].blocks == 0) {
722 p = getInstrPtr(flow->mb,fe[i].pc);
723 if (p == NULL) {
724 MT_lock_unset(&flow->flowlock);
725 throw(MAL, "dataflow", "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL");
726 }
727 for (j = p->retc; j < p->argc; j++)
728 fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j, FALSE);
729 q_enqueue(todo, flow->status + i);
730 flow->status[i].state = DFLOWrunning;
731 PARDEBUG fprintf(stderr, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
732 }
733 MT_lock_unset(&flow->flowlock);
734 MT_sema_up(&w->s);
735
736 PARDEBUG fprintf(stderr, "#run %d instructions in dataflow block\n", actions);
737
738 while (actions != tasks ) {
739 f = q_dequeue(flow->done, NULL);
740 if (ATOMIC_GET(&exiting))
741 break;
742 if (f == NULL)
743 throw(MAL, "dataflow", "DFLOWscheduler(): q_dequeue(flow->done) returned NULL");
744
745 /*
746 * When an instruction is finished we have to reduce the blocked
747 * counter for all dependent instructions. for those where it
748 * drops to zero we can scheduler it we do it here instead of the scheduler
749 */
750
751 MT_lock_set(&flow->flowlock);
752 tasks++;
753 for (last = f->pc - flow->start; last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last])
754 if (flow->status[i].state == DFLOWpending) {
755 flow->status[i].argclaim += f->hotclaim;
756 if (flow->status[i].blocks == 1 ) {
757 flow->status[i].state = DFLOWrunning;
758 flow->status[i].blocks--;
759 q_enqueue(todo, flow->status + i);
760 PARDEBUG fprintf(stderr, "#enqueue pc=%d claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
761 } else {
762 flow->status[i].blocks--;
763 }
764 }
765 MT_lock_unset(&flow->flowlock);
766 }
767 /* release the worker from its specific task (turn it into a
768 * generic worker) */
769 ATOMIC_PTR_SET(&w->cntxt, NULL);
770 /* wrap up errors */
771 assert(flow->done->last == 0);
772 if ((ret = ATOMIC_PTR_XCG(&flow->error, NULL)) != NULL ) {
773 PARDEBUG fprintf(stderr, "#errors encountered %s ", ret);
774 }
775 return ret;
776}
777
778/* We create a pool of GDKnr_threads-1 generic workers, that is,
779 * workers that will take on jobs from any clients. In addition, we
780 * create a single specific worker per client (i.e. each time we enter
781 * here). This specific worker will only do work for the client for
782 * which it was started. In this way we can guarantee that there will
783 * always be progress for the client, even if all other workers are
784 * doing something big.
785 *
786 * When all jobs for a client have been done (there are no more
787 * entries for the client in the queue), the specific worker turns
788 * itself into a generic worker. At the same time, we signal that one
789 * generic worker should exit and this function returns. In this way
790 * we make sure that there are once again GDKnr_threads-1 generic
791 * workers. */
792str
793runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr stk)
794{
795 DataFlow flow = NULL;
796 str msg = MAL_SUCCEED;
797 int size;
798 bit *ret;
799 int i;
800
801#ifdef DEBUG_FLOW
802 fprintf(stderr, "#runMALdataflow for block %d - %d\n", startpc, stoppc);
803 fprintFunction(stderr, mb, 0, LIST_ALL);
804#endif
805
806 /* in debugging mode we should not start multiple threads */
807 if (stk == NULL)
808 throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL");
809 ret = getArgReference_bit(stk,getInstrPtr(mb,startpc),0);
810 *ret = FALSE;
811 if (stk->cmd) {
812 *ret = TRUE;
813 return MAL_SUCCEED;
814 }
815
816 assert(stoppc > startpc);
817
818 /* check existence of workers */
819 if (todo == NULL) {
820 /* create thread pool */
821 if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
822 /* no threads created, run serially */
823 *ret = TRUE;
824 return MAL_SUCCEED;
825 }
826 }
827 assert(todo);
828 /* in addition, create one more worker that will only execute
829 * tasks for the current client to compensate for our waiting
830 * until all work is done */
831 MT_lock_set(&dataflowLock);
832 /* join with already exited threads */
833 {
834 int joined;
835 do {
836 joined = 0;
837 for (i = 0; i < THREADS; i++) {
838 if (workers[i].flag == EXITED) {
839 workers[i].flag = JOINING;
840 ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
841 joined = 1;
842 MT_lock_unset(&dataflowLock);
843 MT_join_thread(workers[i].id);
844 MT_lock_set(&dataflowLock);
845 workers[i].flag = IDLE;
846 }
847 }
848 } while (joined);
849 }
850 for (i = 0; i < THREADS; i++) {
851 if (workers[i].flag == IDLE) {
852 /* only create specific worker if we are not doing a
853 * recursive call */
854 if (stk->calldepth > 1) {
855 int j;
856 MT_Id pid = MT_getpid();
857
858 /* doing a recursive call: copy specificity from
859 * current worker to new worker */
860 ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
861 for (j = 0; j < THREADS; j++) {
862 if (workers[j].flag == RUNNING && workers[j].id == pid) {
863 ATOMIC_PTR_SET(&workers[i].cntxt,
864 ATOMIC_PTR_GET(&workers[j].cntxt));
865 break;
866 }
867 }
868 } else {
869 /* not doing a recursive call: create specific worker */
870 ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
871 }
872 workers[i].flag = RUNNING;
873 char name[16];
874 snprintf(name, sizeof(name), "DFLOWworker%d", i);
875 if ((workers[i].id = THRcreate(DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE, name)) == 0) {
876 /* cannot start new thread, run serially */
877 *ret = TRUE;
878 workers[i].flag = IDLE;
879 MT_lock_unset(&dataflowLock);
880 return MAL_SUCCEED;
881 }
882 break;
883 }
884 }
885 MT_lock_unset(&dataflowLock);
886 if (i == THREADS) {
887 /* no empty thread slots found, run serially */
888 *ret = TRUE;
889 return MAL_SUCCEED;
890 }
891
892 flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
893 if (flow == NULL)
894 throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
895
896 flow->cntxt = cntxt;
897 flow->mb = mb;
898 flow->stk = stk;
899
900 /* keep real block count, exclude brackets */
901 flow->start = startpc + 1;
902 flow->stop = stoppc;
903
904 flow->done = q_create(stoppc- startpc+1, "flow->done");
905 if (flow->done == NULL) {
906 GDKfree(flow);
907 throw(MAL, "dataflow", "runMALdataflow(): Failed to create flow->done queue");
908 }
909
910 flow->status = (FlowEvent)GDKzalloc((stoppc - startpc + 1) * sizeof(FlowEventRec));
911 if (flow->status == NULL) {
912 q_destroy(flow->done);
913 GDKfree(flow);
914 throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
915 }
916 size = DFLOWgraphSize(mb, startpc, stoppc);
917 size += stoppc - startpc;
918 flow->nodes = (int*)GDKzalloc(sizeof(int) * size);
919 if (flow->nodes == NULL) {
920 GDKfree(flow->status);
921 q_destroy(flow->done);
922 GDKfree(flow);
923 throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
924 }
925 flow->edges = (int*)GDKzalloc(sizeof(int) * size);
926 if (flow->edges == NULL) {
927 GDKfree(flow->nodes);
928 GDKfree(flow->status);
929 q_destroy(flow->done);
930 GDKfree(flow);
931 throw(MAL, "dataflow", SQLSTATE(HY001) MAL_MALLOC_FAIL);
932 }
933 MT_lock_init(&flow->flowlock, "flow->flowlock");
934 ATOMIC_PTR_INIT(&flow->error, NULL);
935 msg = DFLOWinitBlk(flow, mb, size);
936
937 if (msg == MAL_SUCCEED)
938 msg = DFLOWscheduler(flow, &workers[i]);
939
940 GDKfree(flow->status);
941 GDKfree(flow->edges);
942 GDKfree(flow->nodes);
943 q_destroy(flow->done);
944 MT_lock_destroy(&flow->flowlock);
945 ATOMIC_PTR_DESTROY(&flow->error);
946 GDKfree(flow);
947
948 /* we created one worker, now tell one worker to exit again */
949 MT_lock_set(&todo->l);
950 todo->exitcount++;
951 MT_lock_unset(&todo->l);
952 MT_sema_up(&todo->s);
953
954 return msg;
955}
956
957str
958deblockdataflow( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
959{
960 int *ret = getArgReference_int(stk,pci,0);
961 int *val = getArgReference_int(stk,pci,1);
962 (void) cntxt;
963 (void) mb;
964 *ret = *val;
965 return MAL_SUCCEED;
966}
967
968static void
969stopMALdataflow(void)
970{
971 int i;
972
973 ATOMIC_SET(&exiting, 1);
974 if (todo) {
975 MT_lock_set(&dataflowLock);
976 for (i = 0; i < THREADS; i++)
977 if (workers[i].flag == RUNNING)
978 MT_sema_up(&todo->s);
979 for (i = 0; i < THREADS; i++) {
980 if (workers[i].flag != IDLE && workers[i].flag != JOINING) {
981 workers[i].flag = JOINING;
982 MT_lock_unset(&dataflowLock);
983 MT_join_thread(workers[i].id);
984 MT_lock_set(&dataflowLock);
985 }
986 workers[i].flag = IDLE;
987 MT_sema_destroy(&workers[i].s);
988 }
989 MT_lock_unset(&dataflowLock);
990 }
991}
992