MonetDB: Feb2013 - Organize parallelism per worker pool

Stefan Manegold Stefan.Manegold at cwi.nl
Wed Aug 21 10:46:23 CEST 2013


Martin,

Compilation and testing appears to work also without these two new functions.

I'll feel free to remove them, again, if you don't mind.

Could you please take care of the tests I mention below?
(Or let me know what I shall do?)

Thanks!
Stefan

----- Original Message -----
> Martin,
> 
> Also, by adding an and exporting new functions stopMALdataflow &
> CMDcallFunction,
> this changeset changes the API,
> and thus must not remain as-is in the Feb2013 release branch.
> 
> Are these two new functions indeed inherently required for fixing bug 3346?
> 
> At least I cannot even find any place where these new functions are used /
> called ...
> 
> Thanks!
> Stefan
> 
> ----- Original Message -----
> > Martin,
> > 
> > could you please also add
> > sql/test/BugTracker-2013/Tests/nestedcalls.sql ?
> > 
> > And
> > could you plase also add the test scripts and stable output for tests
> > sql/test/BugTracker-2013/Tests/oid_handling
> > sql/test/BugTracker-2013/Tests/constraint_checking.Bug_3335
> > sql/test/BugTracker-2013/Tests/pivot.Bug-3339
> > sql/test/BugTracker-2013/Tests/recursion
> > or
> > alternatively remove them from
> > sql/test/BugTracker-2013/Tests/All
> > 
> > Thanks!
> > 
> > In any case, propagation of this will checking will result in
> > (expected/unavoidable) conflicts in monetdb5/mal/mal_dataflow.[ch]
> > and (avoidable) conflicts in sql/test/BugTracker-2013/Tests
> > 
> > All of these will need to be solved by hand ...
> > 
> > 
> > Stefan
> > 
> > 
> > ----- Original Message -----
> > > Changeset: 489815265a61 for MonetDB
> > > URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=489815265a61
> > > Added Files:
> > > 	sql/test/BugTracker-2013/Tests/nestedcalls.stable.err
> > > 	sql/test/BugTracker-2013/Tests/nestedcalls.stable.out
> > > Modified Files:
> > > 	monetdb5/mal/mal_dataflow.c
> > > 	monetdb5/mal/mal_dataflow.h
> > > 	monetdb5/modules/mal/language.c
> > > 	monetdb5/modules/mal/language.h
> > > 	sql/test/BugTracker-2013/Tests/All
> > > Branch: Feb2013
> > > Log Message:
> > > 
> > > Organize parallelism per worker pool
> > > Each (recursive) dataflow block is handed a worker pool now.
> > > Once we run out of worker pools or fail to create a worker,
> > > we continue in sequential mode.
> > > 
> > > Cleaning up the threads is implicit, like all other non-pool
> > > interpreters that might be active at system shutdown.
> > > 
> > > 
> > > diffs (truncated from 762 to 300 lines):
> > > 
> > > diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
> > > --- a/monetdb5/mal/mal_dataflow.c
> > > +++ b/monetdb5/mal/mal_dataflow.c
> > > @@ -35,6 +35,7 @@
> > >   * The flow graphs should be organized such that parallel threads can
> > >   * access it mostly without expensive locking.
> > >   */
> > > +#include "monetdb_config.h"
> > >  #include "mal_dataflow.h"
> > >  #include "mal_client.h"
> > >  
> > > @@ -82,10 +83,10 @@ typedef struct DATAFLOW {
> > >  	Queue *done;        /* instructions handled */
> > >  } *DataFlow, DataFlowRec;
> > >  
> > > -#define MAXQ 1024
> > > -static MT_Id workers[THREADS] = {0};
> > > -static int workerqueue[THREADS] = {0}; /* maps workers towards the todo
> > > queues */
> > > -static Queue *todo[MAXQ] = {0};	/* pending instructions organized by
> > > user
> > > MAXTODO > #users */
> > > +#define MAXQ 256
> > > +static Queue *todos[MAXQ] = {0};	/* pending instructions organized by
> > > dataflow block */
> > > +static bit occupied[MAXQ]={0}; 		/* worker pool is in use? */
> > > +static int volatile exiting = 0;
> > >  
> > >  /*
> > >   * Calculate the size of the dataflow dependency graph.
> > > @@ -108,9 +109,8 @@ DFLOWgraphSize(MalBlkPtr mb, int start,
> > >   */
> > >  
> > >  static Queue*
> > > -q_create(int sz)
> > > +q_create(int sz, const char *name)
> > >  {
> > > -	const char* name = "q_create";
> > >  	Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
> > >  
> > >  	if (q == NULL)
> > > @@ -208,6 +208,8 @@ q_dequeue(Queue *q)
> > >  
> > >  	assert(q);
> > >  	MT_sema_down(&q->s, "q_dequeue");
> > > +	if (exiting)
> > > +		return NULL;
> > >  	MT_lock_set(&q->l, "q_dequeue");
> > >  	assert(q->last > 0);
> > >  	if (q->last > 0) {
> > > @@ -255,25 +257,23 @@ DFLOWworker(void *t)
> > >  {
> > >  	DataFlow flow;
> > >  	FlowEvent fe = 0, fnxt = 0;
> > > -	int id = (int) ((MT_Id *) t - workers), last = 0;
> > > -	int wq;
> > >  	Thread thr;
> > >  	str error = 0;
> > > -
> > > -	int i;
> > > -	lng usec = 0;
> > > +	Queue *todo = *(Queue **) t;
> > > +	int i,last;
> > >  
> > >  	thr = THRnew("DFLOWworker");
> > >  
> > >  	GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
> > >  	GDKerrbuf[0] = 0;
> > >  	while (1) {
> > > -		assert(workerqueue[id] > 0);
> > > -		wq = workerqueue[id] - 1;
> > >  		if (fnxt == 0)
> > > -			fe = q_dequeue(todo[wq]);
> > > +			fe = q_dequeue(todo);
> > >  		else
> > >  			fe = fnxt;
> > > +		if (exiting) {
> > > +			break;
> > > +		}
> > >  		fnxt = 0;
> > >  		assert(fe);
> > >  		flow = fe->flow;
> > > @@ -285,22 +285,20 @@ DFLOWworker(void *t)
> > >  			continue;
> > >  		}
> > >  
> > > -		usec = GDKusec();
> > >  		/* skip all instructions when we have encontered an error */
> > >  		if (flow->error == 0) {
> > >  #ifdef USE_MAL_ADMISSION
> > >  			if (MALadmission(fe->argclaim, fe->hotclaim)) {
> > >  				fe->hotclaim = 0;   /* don't assume priority anymore */
> > > -				assert(todo[wq]);
> > > -				if (todo[wq]->last == 0)
> > > +				if (todo->last == 0)
> > >  					MT_sleep_ms(DELAYUNIT);
> > > -				q_requeue(todo[wq], fe);
> > > +				q_requeue(todo, fe);
> > >  				continue;
> > >  			}
> > >  #endif
> > >  			error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
> > >  			flow->stk, 0, 0);
> > >  			PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= "
> > >  			LLFMT
> > >  			"," LLFMT " %s\n",
> > > -								  fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : "");
> > > +								  fe->pc, (int)((Queue **)t - todos), fe->argclaim,
> > > fe->hotclaim,
> > > error ? error : "");
> > >  #ifdef USE_MAL_ADMISSION
> > >  			/* release the memory claim */
> > >  			MALadmission(-fe->argclaim, -fe->hotclaim);
> > > @@ -331,8 +329,8 @@ DFLOWworker(void *t)
> > >  		InstrPtr p = getInstrPtr(flow->mb, fe->pc);
> > >  		assert(p);
> > >  		fe->hotclaim = 0;
> > > -		for (i = 0; i < p->retc; i++)
> > > -			fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, fe->pc, i,
> > > FALSE);
> > > +		//for (i = 0; i < p->retc; i++)
> > > +			//fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
> > >  		}
> > >  #endif
> > >  		MT_lock_set(&flow->flowlock, "MALworker");
> > > @@ -351,56 +349,64 @@ DFLOWworker(void *t)
> > >  
> > >  		q_enqueue(flow->done, fe);
> > >  		if ( fnxt == 0) {
> > > -			assert(todo[wq]);
> > > -			if (todo[wq]->last == 0)
> > > +			if (todo->last == 0)
> > >  				profilerHeartbeatEvent("wait");
> > > -			else
> > > -				MALresourceFairness(NULL, NULL, usec);
> > >  		}
> > >  	}
> > >  	GDKfree(GDKerrbuf);
> > >  	GDKsetbuf(0);
> > > -	workerqueue[wq] = 0;
> > > -	workers[wq] = 0;
> > >  	THRdel(thr);
> > >  }
> > >  
> > >  /*
> > > - * Create a set of DFLOW interpreters.
> > > + * Create an interpreter pool.
> > >   * One worker will adaptively be available for each client.
> > >   * The remainder are taken from the GDKnr_threads argument and
> > > - * typically is equal to the number of cores
> > > + * typically is equal to the number of cores.
> > > + * A recursive MAL function call would make for one worker less,
> > > + * which limits the number of cores for parallel processing.
> > >   * The workers are assembled in a local table to enable debugging.
> > > + *
> > > + * BEWARE, failure to create a new worker thread is not an error
> > > + * but would lead to serial execution.
> > >   */
> > > -static void
> > > -DFLOWinitialize(int index)
> > > +static int
> > > +DFLOWinitialize(void)
> > >  {
> > > -	int i, worker, limit;
> > > +	int i, threads, grp;
> > > +	MT_Id worker;
> > >  
> > > -	assert(index >= 0);
> > > -	assert(index < THREADS);
> > > +	threads = GDKnr_threads ? GDKnr_threads : 1;
> > >  	MT_lock_set(&mal_contextLock, "DFLOWinitialize");
> > > -	if (todo[index]) {
> > > -		MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
> > > -		return;
> > > +	for(grp = 0; grp< MAXQ; grp++)
> > > +		if ( occupied[grp] == FALSE){
> > > +			occupied[grp] = TRUE;
> > > +			break;
> > > +		}
> > > +	MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
> > > +	if (grp > THREADS) {
> > > +		// continue non-parallel
> > > +		return -1;
> > >  	}
> > > -	todo[index] = q_create(2048);
> > > -	assert(todo[index]);
> > > -	limit = GDKnr_threads ? GDKnr_threads : 1;
> > > -	assert(limit <= THREADS);
> > > -	for (worker = 0, i = 0; i < limit; i++){
> > > -		for (; worker < THREADS; worker++)
> > > -			if( workers[worker] == 0)
> > > -				break;
> > > -		assert(worker < THREADS);
> > > -		if (worker < THREADS) {
> > > -			assert(workers[worker] == 0);
> > > -			MT_create_thread(&workers[worker], DFLOWworker, (void *)
> > > &workers[worker], MT_THR_JOINABLE);
> > > -			assert(workers[worker] > 0);
> > > -			workerqueue[worker] = index + 1;
> > > +	if ( todos[grp] )
> > > +		return grp;
> > > +
> > > +	todos[grp] = q_create(2048, "todo");
> > > +	if (todos[grp] == NULL)
> > > +		return -1;
> > > +
> > > +	// associate a set of workers with the pool
> > > +	for (i = 0; grp>= 0 && i < threads; i++){
> > > +		if (MT_create_thread(&worker, DFLOWworker, (void *) &todos[grp],
> > > MT_THR_JOINABLE) < 0) {
> > > +			//Can not create interpreter thread
> > > +			grp = -1;
> > > +		}
> > > +		if (worker == 0) {
> > > +			//Failed to create interpreter thread
> > > +			grp = -1;
> > >  		}
> > >  	}
> > > -	MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
> > > +	return grp;
> > >  }
> > >   
> > >  /*
> > > @@ -409,18 +415,28 @@ DFLOWinitialize(int index)
> > >   * For each instruction we keep a list of instructions whose
> > >   * blocking counter should be decremented upon finishing it.
> > >   */
> > > -static void
> > > +static str
> > >  DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
> > >  {
> > >  	int pc, i, j, k, l, n, etop = 0;
> > >  	int *assign;
> > >  	InstrPtr p;
> > >  
> > > +	if (flow == NULL)
> > > +		throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
> > > +	if (mb == NULL)
> > > +		throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
> > >  	PARDEBUG printf("Initialize dflow block\n");
> > >  	assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
> > > +	if (assign == NULL)
> > > +		throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate assign");
> > >  	etop = flow->stop - flow->start;
> > >  	for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
> > >  		p = getInstrPtr(mb, pc);
> > > +		if (p == NULL) {
> > > +			GDKfree(assign);
> > > +			throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned
> > > NULL");
> > > +		}
> > >  
> > >  		/* initial state, ie everything can run */
> > >  		flow->status[n].flow = flow;
> > > @@ -501,6 +517,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
> > >  #ifdef USE_MAL_ADMISSION
> > >  	memorypool = memoryclaims = 0;
> > >  #endif
> > > +	return MAL_SUCCEED;
> > >  }
> > >  
> > >  /*
> > > @@ -528,18 +545,17 @@ static void showFlowEvent(DataFlow flow,
> > >  */
> > >  
> > >  static str
> > > -DFLOWscheduler(DataFlow flow)
> > > +DFLOWscheduler(DataFlow flow, Queue *todo)
> > >  {
> > >  	int last;
> > >  	int i;
> > >  #ifdef USE_MAL_ADMISSION
> > > -	int j;
> > > +	//int j;
> > >  	InstrPtr p;
> > >  #endif
> > >  	int tasks=0, actions;
> > >  	str ret = MAL_SUCCEED;
> > >  	FlowEvent fe, f = 0;
> > > -	int wq;
> > >  
> > >  	if (flow == NULL)
> > >  		throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
> > > @@ -549,19 +565,19 @@ DFLOWscheduler(DataFlow flow)
> > >  	/* initialize the eligible statements */
> > >  	fe = flow->status;
> > >  
> > > -	if (fe[0].flow->cntxt->flags & timerFlag)
> > > -		fe[0].flow->cntxt->timer = GDKusec();
> > > -
> > >  	MT_lock_set(&flow->flowlock, "MALworker");
> > > -	wq = flow->cntxt->idx;
> > >  	for (i = 0; i < actions; i++)
> > >  		if (fe[i].blocks == 0) {
> > >  #ifdef USE_MAL_ADMISSION
> > >  			p = getInstrPtr(flow->mb,fe[i].pc);
> > > -			for (j = p->retc; j < p->argc; j++)
> > > -				fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk,
> > > fe[i].pc, j, FALSE);
> > > +			if (p == NULL) {
> > > +				MT_lock_unset(&flow->flowlock, "MALworker");
> > > +				throw(MAL, "dataflow", "DFLOWscheduler():
> > > getInstrPtr(flow->mb,fe[i].pc)
> > > returned NULL");
> > > +			}
> > > +			//for (j = p->retc; j < p->argc; j++)
> > > +				//fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk,
> > > p,
> > > j,
> > > FALSE);
> > >  #endif
> > > -			q_enqueue(todo[wq], flow->status + i);
> > > +			q_enqueue(todo, flow->status + i);
> > >  			flow->status[i].state = DFLOWrunning;
> > >  			PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n",
> > >  			flow->status[i].pc, flow->status[i].argclaim);
> > >  		}
> > > @@ -571,6 +587,10 @@ DFLOWscheduler(DataFlow flow)
> > >  
> > >  	while (actions != tasks ) {
> > > _______________________________________________
> > > checkin-list mailing list
> > > checkin-list at monetdb.org
> > > http://mail.monetdb.org/mailman/listinfo/checkin-list
> > > 
> > 
> > --
> > | Stefan.Manegold at CWI.nl | DB Architectures   (DA) |
> > | www.CWI.nl/~manegold/  | Science Park 123 (L321) |
> > | +31 (0)20 592-4212     | 1098 XG Amsterdam  (NL) |
> > 
> > _______________________________________________
> > developers-list mailing list
> > developers-list at monetdb.org
> > http://mail.monetdb.org/mailman/listinfo/developers-list
> > 
> 
> --
> | Stefan.Manegold at CWI.nl | DB Architectures   (DA) |
> | www.CWI.nl/~manegold/  | Science Park 123 (L321) |
> | +31 (0)20 592-4212     | 1098 XG Amsterdam  (NL) |
> 
> _______________________________________________
> developers-list mailing list
> developers-list at monetdb.org
> http://mail.monetdb.org/mailman/listinfo/developers-list
> 

-- 
| Stefan.Manegold at CWI.nl | DB Architectures   (DA) |
| www.CWI.nl/~manegold/  | Science Park 123 (L321) |
| +31 (0)20 592-4212     | 1098 XG Amsterdam  (NL) |




More information about the developers-list mailing list