MonetDB: Feb2013 - Organize parallelism per worker pool

Stefan Manegold Stefan.Manegold at cwi.nl
Wed Aug 21 10:42:01 CEST 2013


Martin,

Also, by adding an and exporting new functions stopMALdataflow & CMDcallFunction,
this changeset changes the API,
and thus must not remain as-is in the Feb2013 release branch.

Are these two new functions indeed inherently required for fixing bug 3346?

At least I cannot even find any place where these new functions are used / called ...

Thanks!
Stefan

----- Original Message -----
> Martin,
> 
> could you please also add
> sql/test/BugTracker-2013/Tests/nestedcalls.sql ?
> 
> And
> could you plase also add the test scripts and stable output for tests
> sql/test/BugTracker-2013/Tests/oid_handling
> sql/test/BugTracker-2013/Tests/constraint_checking.Bug_3335
> sql/test/BugTracker-2013/Tests/pivot.Bug-3339
> sql/test/BugTracker-2013/Tests/recursion
> or
> alternatively remove them from
> sql/test/BugTracker-2013/Tests/All
> 
> Thanks!
> 
> In any case, propagation of this will checking will result in
> (expected/unavoidable) conflicts in monetdb5/mal/mal_dataflow.[ch]
> and (avoidable) conflicts in sql/test/BugTracker-2013/Tests
> 
> All of these will need to be solved by hand ...
> 
> 
> Stefan
> 
> 
> ----- Original Message -----
> > Changeset: 489815265a61 for MonetDB
> > URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=489815265a61
> > Added Files:
> > 	sql/test/BugTracker-2013/Tests/nestedcalls.stable.err
> > 	sql/test/BugTracker-2013/Tests/nestedcalls.stable.out
> > Modified Files:
> > 	monetdb5/mal/mal_dataflow.c
> > 	monetdb5/mal/mal_dataflow.h
> > 	monetdb5/modules/mal/language.c
> > 	monetdb5/modules/mal/language.h
> > 	sql/test/BugTracker-2013/Tests/All
> > Branch: Feb2013
> > Log Message:
> > 
> > Organize parallelism per worker pool
> > Each (recursive) dataflow block is handed a worker pool now.
> > Once we run out of worker pools or fail to create a worker,
> > we continue in sequential mode.
> > 
> > Cleaning up the threads is implicit, like all other non-pool
> > interpreters that might be active at system shutdown.
> > 
> > 
> > diffs (truncated from 762 to 300 lines):
> > 
> > diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
> > --- a/monetdb5/mal/mal_dataflow.c
> > +++ b/monetdb5/mal/mal_dataflow.c
> > @@ -35,6 +35,7 @@
> >   * The flow graphs should be organized such that parallel threads can
> >   * access it mostly without expensive locking.
> >   */
> > +#include "monetdb_config.h"
> >  #include "mal_dataflow.h"
> >  #include "mal_client.h"
> >  
> > @@ -82,10 +83,10 @@ typedef struct DATAFLOW {
> >  	Queue *done;        /* instructions handled */
> >  } *DataFlow, DataFlowRec;
> >  
> > -#define MAXQ 1024
> > -static MT_Id workers[THREADS] = {0};
> > -static int workerqueue[THREADS] = {0}; /* maps workers towards the todo
> > queues */
> > -static Queue *todo[MAXQ] = {0};	/* pending instructions organized by user
> > MAXTODO > #users */
> > +#define MAXQ 256
> > +static Queue *todos[MAXQ] = {0};	/* pending instructions organized by
> > dataflow block */
> > +static bit occupied[MAXQ]={0}; 		/* worker pool is in use? */
> > +static int volatile exiting = 0;
> >  
> >  /*
> >   * Calculate the size of the dataflow dependency graph.
> > @@ -108,9 +109,8 @@ DFLOWgraphSize(MalBlkPtr mb, int start,
> >   */
> >  
> >  static Queue*
> > -q_create(int sz)
> > +q_create(int sz, const char *name)
> >  {
> > -	const char* name = "q_create";
> >  	Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
> >  
> >  	if (q == NULL)
> > @@ -208,6 +208,8 @@ q_dequeue(Queue *q)
> >  
> >  	assert(q);
> >  	MT_sema_down(&q->s, "q_dequeue");
> > +	if (exiting)
> > +		return NULL;
> >  	MT_lock_set(&q->l, "q_dequeue");
> >  	assert(q->last > 0);
> >  	if (q->last > 0) {
> > @@ -255,25 +257,23 @@ DFLOWworker(void *t)
> >  {
> >  	DataFlow flow;
> >  	FlowEvent fe = 0, fnxt = 0;
> > -	int id = (int) ((MT_Id *) t - workers), last = 0;
> > -	int wq;
> >  	Thread thr;
> >  	str error = 0;
> > -
> > -	int i;
> > -	lng usec = 0;
> > +	Queue *todo = *(Queue **) t;
> > +	int i,last;
> >  
> >  	thr = THRnew("DFLOWworker");
> >  
> >  	GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
> >  	GDKerrbuf[0] = 0;
> >  	while (1) {
> > -		assert(workerqueue[id] > 0);
> > -		wq = workerqueue[id] - 1;
> >  		if (fnxt == 0)
> > -			fe = q_dequeue(todo[wq]);
> > +			fe = q_dequeue(todo);
> >  		else
> >  			fe = fnxt;
> > +		if (exiting) {
> > +			break;
> > +		}
> >  		fnxt = 0;
> >  		assert(fe);
> >  		flow = fe->flow;
> > @@ -285,22 +285,20 @@ DFLOWworker(void *t)
> >  			continue;
> >  		}
> >  
> > -		usec = GDKusec();
> >  		/* skip all instructions when we have encontered an error */
> >  		if (flow->error == 0) {
> >  #ifdef USE_MAL_ADMISSION
> >  			if (MALadmission(fe->argclaim, fe->hotclaim)) {
> >  				fe->hotclaim = 0;   /* don't assume priority anymore */
> > -				assert(todo[wq]);
> > -				if (todo[wq]->last == 0)
> > +				if (todo->last == 0)
> >  					MT_sleep_ms(DELAYUNIT);
> > -				q_requeue(todo[wq], fe);
> > +				q_requeue(todo, fe);
> >  				continue;
> >  			}
> >  #endif
> >  			error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
> >  			flow->stk, 0, 0);
> >  			PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= "
> >  			LLFMT
> >  			"," LLFMT " %s\n",
> > -								  fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : "");
> > +								  fe->pc, (int)((Queue **)t - todos), fe->argclaim, fe->hotclaim,
> > error ? error : "");
> >  #ifdef USE_MAL_ADMISSION
> >  			/* release the memory claim */
> >  			MALadmission(-fe->argclaim, -fe->hotclaim);
> > @@ -331,8 +329,8 @@ DFLOWworker(void *t)
> >  		InstrPtr p = getInstrPtr(flow->mb, fe->pc);
> >  		assert(p);
> >  		fe->hotclaim = 0;
> > -		for (i = 0; i < p->retc; i++)
> > -			fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, fe->pc, i, FALSE);
> > +		//for (i = 0; i < p->retc; i++)
> > +			//fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
> >  		}
> >  #endif
> >  		MT_lock_set(&flow->flowlock, "MALworker");
> > @@ -351,56 +349,64 @@ DFLOWworker(void *t)
> >  
> >  		q_enqueue(flow->done, fe);
> >  		if ( fnxt == 0) {
> > -			assert(todo[wq]);
> > -			if (todo[wq]->last == 0)
> > +			if (todo->last == 0)
> >  				profilerHeartbeatEvent("wait");
> > -			else
> > -				MALresourceFairness(NULL, NULL, usec);
> >  		}
> >  	}
> >  	GDKfree(GDKerrbuf);
> >  	GDKsetbuf(0);
> > -	workerqueue[wq] = 0;
> > -	workers[wq] = 0;
> >  	THRdel(thr);
> >  }
> >  
> >  /*
> > - * Create a set of DFLOW interpreters.
> > + * Create an interpreter pool.
> >   * One worker will adaptively be available for each client.
> >   * The remainder are taken from the GDKnr_threads argument and
> > - * typically is equal to the number of cores
> > + * typically is equal to the number of cores.
> > + * A recursive MAL function call would make for one worker less,
> > + * which limits the number of cores for parallel processing.
> >   * The workers are assembled in a local table to enable debugging.
> > + *
> > + * BEWARE, failure to create a new worker thread is not an error
> > + * but would lead to serial execution.
> >   */
> > -static void
> > -DFLOWinitialize(int index)
> > +static int
> > +DFLOWinitialize(void)
> >  {
> > -	int i, worker, limit;
> > +	int i, threads, grp;
> > +	MT_Id worker;
> >  
> > -	assert(index >= 0);
> > -	assert(index < THREADS);
> > +	threads = GDKnr_threads ? GDKnr_threads : 1;
> >  	MT_lock_set(&mal_contextLock, "DFLOWinitialize");
> > -	if (todo[index]) {
> > -		MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
> > -		return;
> > +	for(grp = 0; grp< MAXQ; grp++)
> > +		if ( occupied[grp] == FALSE){
> > +			occupied[grp] = TRUE;
> > +			break;
> > +		}
> > +	MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
> > +	if (grp > THREADS) {
> > +		// continue non-parallel
> > +		return -1;
> >  	}
> > -	todo[index] = q_create(2048);
> > -	assert(todo[index]);
> > -	limit = GDKnr_threads ? GDKnr_threads : 1;
> > -	assert(limit <= THREADS);
> > -	for (worker = 0, i = 0; i < limit; i++){
> > -		for (; worker < THREADS; worker++)
> > -			if( workers[worker] == 0)
> > -				break;
> > -		assert(worker < THREADS);
> > -		if (worker < THREADS) {
> > -			assert(workers[worker] == 0);
> > -			MT_create_thread(&workers[worker], DFLOWworker, (void *)
> > &workers[worker], MT_THR_JOINABLE);
> > -			assert(workers[worker] > 0);
> > -			workerqueue[worker] = index + 1;
> > +	if ( todos[grp] )
> > +		return grp;
> > +
> > +	todos[grp] = q_create(2048, "todo");
> > +	if (todos[grp] == NULL)
> > +		return -1;
> > +
> > +	// associate a set of workers with the pool
> > +	for (i = 0; grp>= 0 && i < threads; i++){
> > +		if (MT_create_thread(&worker, DFLOWworker, (void *) &todos[grp],
> > MT_THR_JOINABLE) < 0) {
> > +			//Can not create interpreter thread
> > +			grp = -1;
> > +		}
> > +		if (worker == 0) {
> > +			//Failed to create interpreter thread
> > +			grp = -1;
> >  		}
> >  	}
> > -	MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
> > +	return grp;
> >  }
> >   
> >  /*
> > @@ -409,18 +415,28 @@ DFLOWinitialize(int index)
> >   * For each instruction we keep a list of instructions whose
> >   * blocking counter should be decremented upon finishing it.
> >   */
> > -static void
> > +static str
> >  DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
> >  {
> >  	int pc, i, j, k, l, n, etop = 0;
> >  	int *assign;
> >  	InstrPtr p;
> >  
> > +	if (flow == NULL)
> > +		throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
> > +	if (mb == NULL)
> > +		throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
> >  	PARDEBUG printf("Initialize dflow block\n");
> >  	assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
> > +	if (assign == NULL)
> > +		throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate assign");
> >  	etop = flow->stop - flow->start;
> >  	for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
> >  		p = getInstrPtr(mb, pc);
> > +		if (p == NULL) {
> > +			GDKfree(assign);
> > +			throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned NULL");
> > +		}
> >  
> >  		/* initial state, ie everything can run */
> >  		flow->status[n].flow = flow;
> > @@ -501,6 +517,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
> >  #ifdef USE_MAL_ADMISSION
> >  	memorypool = memoryclaims = 0;
> >  #endif
> > +	return MAL_SUCCEED;
> >  }
> >  
> >  /*
> > @@ -528,18 +545,17 @@ static void showFlowEvent(DataFlow flow,
> >  */
> >  
> >  static str
> > -DFLOWscheduler(DataFlow flow)
> > +DFLOWscheduler(DataFlow flow, Queue *todo)
> >  {
> >  	int last;
> >  	int i;
> >  #ifdef USE_MAL_ADMISSION
> > -	int j;
> > +	//int j;
> >  	InstrPtr p;
> >  #endif
> >  	int tasks=0, actions;
> >  	str ret = MAL_SUCCEED;
> >  	FlowEvent fe, f = 0;
> > -	int wq;
> >  
> >  	if (flow == NULL)
> >  		throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
> > @@ -549,19 +565,19 @@ DFLOWscheduler(DataFlow flow)
> >  	/* initialize the eligible statements */
> >  	fe = flow->status;
> >  
> > -	if (fe[0].flow->cntxt->flags & timerFlag)
> > -		fe[0].flow->cntxt->timer = GDKusec();
> > -
> >  	MT_lock_set(&flow->flowlock, "MALworker");
> > -	wq = flow->cntxt->idx;
> >  	for (i = 0; i < actions; i++)
> >  		if (fe[i].blocks == 0) {
> >  #ifdef USE_MAL_ADMISSION
> >  			p = getInstrPtr(flow->mb,fe[i].pc);
> > -			for (j = p->retc; j < p->argc; j++)
> > -				fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk,
> > fe[i].pc, j, FALSE);
> > +			if (p == NULL) {
> > +				MT_lock_unset(&flow->flowlock, "MALworker");
> > +				throw(MAL, "dataflow", "DFLOWscheduler():
> > getInstrPtr(flow->mb,fe[i].pc)
> > returned NULL");
> > +			}
> > +			//for (j = p->retc; j < p->argc; j++)
> > +				//fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p,
> > j,
> > FALSE);
> >  #endif
> > -			q_enqueue(todo[wq], flow->status + i);
> > +			q_enqueue(todo, flow->status + i);
> >  			flow->status[i].state = DFLOWrunning;
> >  			PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n",
> >  			flow->status[i].pc, flow->status[i].argclaim);
> >  		}
> > @@ -571,6 +587,10 @@ DFLOWscheduler(DataFlow flow)
> >  
> >  	while (actions != tasks ) {
> > _______________________________________________
> > checkin-list mailing list
> > checkin-list at monetdb.org
> > http://mail.monetdb.org/mailman/listinfo/checkin-list
> > 
> 
> --
> | Stefan.Manegold at CWI.nl | DB Architectures   (DA) |
> | www.CWI.nl/~manegold/  | Science Park 123 (L321) |
> | +31 (0)20 592-4212     | 1098 XG Amsterdam  (NL) |
> 
> _______________________________________________
> developers-list mailing list
> developers-list at monetdb.org
> http://mail.monetdb.org/mailman/listinfo/developers-list
> 

-- 
| Stefan.Manegold at CWI.nl | DB Architectures   (DA) |
| www.CWI.nl/~manegold/  | Science Park 123 (L321) |
| +31 (0)20 592-4212     | 1098 XG Amsterdam  (NL) |




More information about the developers-list mailing list