Martin,
Also, by adding an and exporting new functions stopMALdataflow & CMDcallFunction, this changeset changes the API, and thus must not remain as-is in the Feb2013 release branch.
Are these two new functions indeed inherently required for fixing bug 3346?
At least I cannot even find any place where these new functions are used / called ...
Thanks! Stefan
----- Original Message -----
Martin,
could you please also add sql/test/BugTracker-2013/Tests/nestedcalls.sql ?
And could you plase also add the test scripts and stable output for tests sql/test/BugTracker-2013/Tests/oid_handling sql/test/BugTracker-2013/Tests/constraint_checking.Bug_3335 sql/test/BugTracker-2013/Tests/pivot.Bug-3339 sql/test/BugTracker-2013/Tests/recursion or alternatively remove them from sql/test/BugTracker-2013/Tests/All
Thanks!
In any case, propagation of this will checking will result in (expected/unavoidable) conflicts in monetdb5/mal/mal_dataflow.[ch] and (avoidable) conflicts in sql/test/BugTracker-2013/Tests
All of these will need to be solved by hand ...
Stefan
----- Original Message -----
Changeset: 489815265a61 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=489815265a61 Added Files: sql/test/BugTracker-2013/Tests/nestedcalls.stable.err sql/test/BugTracker-2013/Tests/nestedcalls.stable.out Modified Files: monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_dataflow.h monetdb5/modules/mal/language.c monetdb5/modules/mal/language.h sql/test/BugTracker-2013/Tests/All Branch: Feb2013 Log Message:
Organize parallelism per worker pool Each (recursive) dataflow block is handed a worker pool now. Once we run out of worker pools or fail to create a worker, we continue in sequential mode.
Cleaning up the threads is implicit, like all other non-pool interpreters that might be active at system shutdown.
diffs (truncated from 762 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -35,6 +35,7 @@
- The flow graphs should be organized such that parallel threads can
- access it mostly without expensive locking.
*/ +#include "monetdb_config.h" #include "mal_dataflow.h" #include "mal_client.h"
@@ -82,10 +83,10 @@ typedef struct DATAFLOW { Queue *done; /* instructions handled */ } *DataFlow, DataFlowRec;
-#define MAXQ 1024 -static MT_Id workers[THREADS] = {0}; -static int workerqueue[THREADS] = {0}; /* maps workers towards the todo queues */ -static Queue *todo[MAXQ] = {0}; /* pending instructions organized by user MAXTODO > #users */ +#define MAXQ 256 +static Queue *todos[MAXQ] = {0}; /* pending instructions organized by dataflow block */ +static bit occupied[MAXQ]={0}; /* worker pool is in use? */ +static int volatile exiting = 0;
/*
- Calculate the size of the dataflow dependency graph.
@@ -108,9 +109,8 @@ DFLOWgraphSize(MalBlkPtr mb, int start, */
static Queue* -q_create(int sz) +q_create(int sz, const char *name) {
const char* name = "q_create"; Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
if (q == NULL)
@@ -208,6 +208,8 @@ q_dequeue(Queue *q)
assert(q); MT_sema_down(&q->s, "q_dequeue");
- if (exiting)
MT_lock_set(&q->l, "q_dequeue"); assert(q->last > 0); if (q->last > 0) {return NULL;
@@ -255,25 +257,23 @@ DFLOWworker(void *t) { DataFlow flow; FlowEvent fe = 0, fnxt = 0;
- int id = (int) ((MT_Id *) t - workers), last = 0;
- int wq; Thread thr; str error = 0;
- int i;
- lng usec = 0;
Queue *todo = *(Queue **) t;
int i,last;
thr = THRnew("DFLOWworker");
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */ GDKerrbuf[0] = 0; while (1) {
assert(workerqueue[id] > 0);
if (fnxt == 0)wq = workerqueue[id] - 1;
fe = q_dequeue(todo[wq]);
else fe = fnxt;fe = q_dequeue(todo);
if (exiting) {
break;
fnxt = 0; assert(fe); flow = fe->flow;}
@@ -285,22 +285,20 @@ DFLOWworker(void *t) continue; }
/* skip all instructions when we have encontered an error */ if (flow->error == 0) {usec = GDKusec();
#ifdef USE_MAL_ADMISSION if (MALadmission(fe->argclaim, fe->hotclaim)) { fe->hotclaim = 0; /* don't assume priority anymore */
assert(todo[wq]);
if (todo[wq]->last == 0)
if (todo->last == 0) MT_sleep_ms(DELAYUNIT);
q_requeue(todo[wq], fe);
q_requeue(todo, fe); continue; }
#endif error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0); PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT "," LLFMT " %s\n",
fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : "");
fe->pc, (int)((Queue **)t - todos), fe->argclaim, fe->hotclaim,
error ? error : ""); #ifdef USE_MAL_ADMISSION /* release the memory claim */ MALadmission(-fe->argclaim, -fe->hotclaim); @@ -331,8 +329,8 @@ DFLOWworker(void *t) InstrPtr p = getInstrPtr(flow->mb, fe->pc); assert(p); fe->hotclaim = 0;
for (i = 0; i < p->retc; i++)
fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, fe->pc, i, FALSE);
//for (i = 0; i < p->retc; i++)
}//fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
#endif MT_lock_set(&flow->flowlock, "MALworker"); @@ -351,56 +349,64 @@ DFLOWworker(void *t)
q_enqueue(flow->done, fe); if ( fnxt == 0) {
assert(todo[wq]);
if (todo[wq]->last == 0)
if (todo->last == 0) profilerHeartbeatEvent("wait");
else
} } GDKfree(GDKerrbuf); GDKsetbuf(0);MALresourceFairness(NULL, NULL, usec);
- workerqueue[wq] = 0;
- workers[wq] = 0; THRdel(thr);
}
/*
- Create a set of DFLOW interpreters.
- Create an interpreter pool.
- One worker will adaptively be available for each client.
- The remainder are taken from the GDKnr_threads argument and
- typically is equal to the number of cores
- typically is equal to the number of cores.
- A recursive MAL function call would make for one worker less,
- which limits the number of cores for parallel processing.
- The workers are assembled in a local table to enable debugging.
- BEWARE, failure to create a new worker thread is not an error
*/
- but would lead to serial execution.
-static void -DFLOWinitialize(int index) +static int +DFLOWinitialize(void) {
- int i, worker, limit;
- int i, threads, grp;
- MT_Id worker;
- assert(index >= 0);
- assert(index < THREADS);
- threads = GDKnr_threads ? GDKnr_threads : 1; MT_lock_set(&mal_contextLock, "DFLOWinitialize");
- if (todo[index]) {
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
return;
- for(grp = 0; grp< MAXQ; grp++)
if ( occupied[grp] == FALSE){
occupied[grp] = TRUE;
break;
}
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- if (grp > THREADS) {
// continue non-parallel
}return -1;
- todo[index] = q_create(2048);
- assert(todo[index]);
- limit = GDKnr_threads ? GDKnr_threads : 1;
- assert(limit <= THREADS);
- for (worker = 0, i = 0; i < limit; i++){
for (; worker < THREADS; worker++)
if( workers[worker] == 0)
break;
assert(worker < THREADS);
if (worker < THREADS) {
assert(workers[worker] == 0);
MT_create_thread(&workers[worker], DFLOWworker, (void *)
&workers[worker], MT_THR_JOINABLE);
assert(workers[worker] > 0);
workerqueue[worker] = index + 1;
- if ( todos[grp] )
return grp;
- todos[grp] = q_create(2048, "todo");
- if (todos[grp] == NULL)
return -1;
- // associate a set of workers with the pool
- for (i = 0; grp>= 0 && i < threads; i++){
if (MT_create_thread(&worker, DFLOWworker, (void *) &todos[grp],
MT_THR_JOINABLE) < 0) {
//Can not create interpreter thread
grp = -1;
}
if (worker == 0) {
//Failed to create interpreter thread
} }grp = -1;
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- return grp;
}
/* @@ -409,18 +415,28 @@ DFLOWinitialize(int index)
- For each instruction we keep a list of instructions whose
- blocking counter should be decremented upon finishing it.
*/ -static void +static str DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size) { int pc, i, j, k, l, n, etop = 0; int *assign; InstrPtr p;
if (flow == NULL)
throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
if (mb == NULL)
throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
PARDEBUG printf("Initialize dflow block\n"); assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
if (assign == NULL)
throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate assign");
etop = flow->stop - flow->start; for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) { p = getInstrPtr(mb, pc);
if (p == NULL) {
GDKfree(assign);
throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned NULL");
}
/* initial state, ie everything can run */ flow->status[n].flow = flow;
@@ -501,6 +517,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb #ifdef USE_MAL_ADMISSION memorypool = memoryclaims = 0; #endif
- return MAL_SUCCEED;
}
/* @@ -528,18 +545,17 @@ static void showFlowEvent(DataFlow flow, */
static str -DFLOWscheduler(DataFlow flow) +DFLOWscheduler(DataFlow flow, Queue *todo) { int last; int i; #ifdef USE_MAL_ADMISSION
- int j;
- //int j; InstrPtr p;
#endif int tasks=0, actions; str ret = MAL_SUCCEED; FlowEvent fe, f = 0;
int wq;
if (flow == NULL) throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
@@ -549,19 +565,19 @@ DFLOWscheduler(DataFlow flow) /* initialize the eligible statements */ fe = flow->status;
- if (fe[0].flow->cntxt->flags & timerFlag)
fe[0].flow->cntxt->timer = GDKusec();
- MT_lock_set(&flow->flowlock, "MALworker");
- wq = flow->cntxt->idx; for (i = 0; i < actions; i++) if (fe[i].blocks == 0) {
#ifdef USE_MAL_ADMISSION p = getInstrPtr(flow->mb,fe[i].pc);
for (j = p->retc; j < p->argc; j++)
fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk,
fe[i].pc, j, FALSE);
if (p == NULL) {
MT_lock_unset(&flow->flowlock, "MALworker");
throw(MAL, "dataflow", "DFLOWscheduler():
getInstrPtr(flow->mb,fe[i].pc) returned NULL");
}
//for (j = p->retc; j < p->argc; j++)
//fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p,
j, FALSE); #endif
q_enqueue(todo[wq], flow->status + i);
}q_enqueue(todo, flow->status + i); flow->status[i].state = DFLOWrunning; PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
@@ -571,6 +587,10 @@ DFLOWscheduler(DataFlow flow)
while (actions != tasks ) { _______________________________________________ checkin-list mailing list checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list
-- | Stefan.Manegold@CWI.nl | DB Architectures (DA) | | www.CWI.nl/~manegold/ | Science Park 123 (L321) | | +31 (0)20 592-4212 | 1098 XG Amsterdam (NL) |
developers-list mailing list developers-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/developers-list