MonetDB: Feb2013 - Organize parallelism per worker pool

Stefan Manegold Stefan.Manegold at cwi.nl
Wed Aug 21 10:00:49 CEST 2013


Martin,

could you please also add
sql/test/BugTracker-2013/Tests/nestedcalls.sql ?

And
could you plase also add the test scripts and stable output for tests
sql/test/BugTracker-2013/Tests/oid_handling
sql/test/BugTracker-2013/Tests/constraint_checking.Bug_3335
sql/test/BugTracker-2013/Tests/pivot.Bug-3339
sql/test/BugTracker-2013/Tests/recursion
or
alternatively remove them from
sql/test/BugTracker-2013/Tests/All

Thanks!

In any case, propagation of this will checking will result in (expected/unavoidable) conflicts in monetdb5/mal/mal_dataflow.[ch]
and (avoidable) conflicts in sql/test/BugTracker-2013/Tests

All of these will need to be solved by hand ...


Stefan


----- Original Message -----
> Changeset: 489815265a61 for MonetDB
> URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=489815265a61
> Added Files:
> 	sql/test/BugTracker-2013/Tests/nestedcalls.stable.err
> 	sql/test/BugTracker-2013/Tests/nestedcalls.stable.out
> Modified Files:
> 	monetdb5/mal/mal_dataflow.c
> 	monetdb5/mal/mal_dataflow.h
> 	monetdb5/modules/mal/language.c
> 	monetdb5/modules/mal/language.h
> 	sql/test/BugTracker-2013/Tests/All
> Branch: Feb2013
> Log Message:
> 
> Organize parallelism per worker pool
> Each (recursive) dataflow block is handed a worker pool now.
> Once we run out of worker pools or fail to create a worker,
> we continue in sequential mode.
> 
> Cleaning up the threads is implicit, like all other non-pool
> interpreters that might be active at system shutdown.
> 
> 
> diffs (truncated from 762 to 300 lines):
> 
> diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
> --- a/monetdb5/mal/mal_dataflow.c
> +++ b/monetdb5/mal/mal_dataflow.c
> @@ -35,6 +35,7 @@
>   * The flow graphs should be organized such that parallel threads can
>   * access it mostly without expensive locking.
>   */
> +#include "monetdb_config.h"
>  #include "mal_dataflow.h"
>  #include "mal_client.h"
>  
> @@ -82,10 +83,10 @@ typedef struct DATAFLOW {
>  	Queue *done;        /* instructions handled */
>  } *DataFlow, DataFlowRec;
>  
> -#define MAXQ 1024
> -static MT_Id workers[THREADS] = {0};
> -static int workerqueue[THREADS] = {0}; /* maps workers towards the todo
> queues */
> -static Queue *todo[MAXQ] = {0};	/* pending instructions organized by user
> MAXTODO > #users */
> +#define MAXQ 256
> +static Queue *todos[MAXQ] = {0};	/* pending instructions organized by
> dataflow block */
> +static bit occupied[MAXQ]={0}; 		/* worker pool is in use? */
> +static int volatile exiting = 0;
>  
>  /*
>   * Calculate the size of the dataflow dependency graph.
> @@ -108,9 +109,8 @@ DFLOWgraphSize(MalBlkPtr mb, int start,
>   */
>  
>  static Queue*
> -q_create(int sz)
> +q_create(int sz, const char *name)
>  {
> -	const char* name = "q_create";
>  	Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
>  
>  	if (q == NULL)
> @@ -208,6 +208,8 @@ q_dequeue(Queue *q)
>  
>  	assert(q);
>  	MT_sema_down(&q->s, "q_dequeue");
> +	if (exiting)
> +		return NULL;
>  	MT_lock_set(&q->l, "q_dequeue");
>  	assert(q->last > 0);
>  	if (q->last > 0) {
> @@ -255,25 +257,23 @@ DFLOWworker(void *t)
>  {
>  	DataFlow flow;
>  	FlowEvent fe = 0, fnxt = 0;
> -	int id = (int) ((MT_Id *) t - workers), last = 0;
> -	int wq;
>  	Thread thr;
>  	str error = 0;
> -
> -	int i;
> -	lng usec = 0;
> +	Queue *todo = *(Queue **) t;
> +	int i,last;
>  
>  	thr = THRnew("DFLOWworker");
>  
>  	GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
>  	GDKerrbuf[0] = 0;
>  	while (1) {
> -		assert(workerqueue[id] > 0);
> -		wq = workerqueue[id] - 1;
>  		if (fnxt == 0)
> -			fe = q_dequeue(todo[wq]);
> +			fe = q_dequeue(todo);
>  		else
>  			fe = fnxt;
> +		if (exiting) {
> +			break;
> +		}
>  		fnxt = 0;
>  		assert(fe);
>  		flow = fe->flow;
> @@ -285,22 +285,20 @@ DFLOWworker(void *t)
>  			continue;
>  		}
>  
> -		usec = GDKusec();
>  		/* skip all instructions when we have encontered an error */
>  		if (flow->error == 0) {
>  #ifdef USE_MAL_ADMISSION
>  			if (MALadmission(fe->argclaim, fe->hotclaim)) {
>  				fe->hotclaim = 0;   /* don't assume priority anymore */
> -				assert(todo[wq]);
> -				if (todo[wq]->last == 0)
> +				if (todo->last == 0)
>  					MT_sleep_ms(DELAYUNIT);
> -				q_requeue(todo[wq], fe);
> +				q_requeue(todo, fe);
>  				continue;
>  			}
>  #endif
>  			error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
>  			flow->stk, 0, 0);
>  			PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT
>  			"," LLFMT " %s\n",
> -								  fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : "");
> +								  fe->pc, (int)((Queue **)t - todos), fe->argclaim, fe->hotclaim,
> error ? error : "");
>  #ifdef USE_MAL_ADMISSION
>  			/* release the memory claim */
>  			MALadmission(-fe->argclaim, -fe->hotclaim);
> @@ -331,8 +329,8 @@ DFLOWworker(void *t)
>  		InstrPtr p = getInstrPtr(flow->mb, fe->pc);
>  		assert(p);
>  		fe->hotclaim = 0;
> -		for (i = 0; i < p->retc; i++)
> -			fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, fe->pc, i, FALSE);
> +		//for (i = 0; i < p->retc; i++)
> +			//fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
>  		}
>  #endif
>  		MT_lock_set(&flow->flowlock, "MALworker");
> @@ -351,56 +349,64 @@ DFLOWworker(void *t)
>  
>  		q_enqueue(flow->done, fe);
>  		if ( fnxt == 0) {
> -			assert(todo[wq]);
> -			if (todo[wq]->last == 0)
> +			if (todo->last == 0)
>  				profilerHeartbeatEvent("wait");
> -			else
> -				MALresourceFairness(NULL, NULL, usec);
>  		}
>  	}
>  	GDKfree(GDKerrbuf);
>  	GDKsetbuf(0);
> -	workerqueue[wq] = 0;
> -	workers[wq] = 0;
>  	THRdel(thr);
>  }
>  
>  /*
> - * Create a set of DFLOW interpreters.
> + * Create an interpreter pool.
>   * One worker will adaptively be available for each client.
>   * The remainder are taken from the GDKnr_threads argument and
> - * typically is equal to the number of cores
> + * typically is equal to the number of cores.
> + * A recursive MAL function call would make for one worker less,
> + * which limits the number of cores for parallel processing.
>   * The workers are assembled in a local table to enable debugging.
> + *
> + * BEWARE, failure to create a new worker thread is not an error
> + * but would lead to serial execution.
>   */
> -static void
> -DFLOWinitialize(int index)
> +static int
> +DFLOWinitialize(void)
>  {
> -	int i, worker, limit;
> +	int i, threads, grp;
> +	MT_Id worker;
>  
> -	assert(index >= 0);
> -	assert(index < THREADS);
> +	threads = GDKnr_threads ? GDKnr_threads : 1;
>  	MT_lock_set(&mal_contextLock, "DFLOWinitialize");
> -	if (todo[index]) {
> -		MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
> -		return;
> +	for(grp = 0; grp< MAXQ; grp++)
> +		if ( occupied[grp] == FALSE){
> +			occupied[grp] = TRUE;
> +			break;
> +		}
> +	MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
> +	if (grp > THREADS) {
> +		// continue non-parallel
> +		return -1;
>  	}
> -	todo[index] = q_create(2048);
> -	assert(todo[index]);
> -	limit = GDKnr_threads ? GDKnr_threads : 1;
> -	assert(limit <= THREADS);
> -	for (worker = 0, i = 0; i < limit; i++){
> -		for (; worker < THREADS; worker++)
> -			if( workers[worker] == 0)
> -				break;
> -		assert(worker < THREADS);
> -		if (worker < THREADS) {
> -			assert(workers[worker] == 0);
> -			MT_create_thread(&workers[worker], DFLOWworker, (void *)
> &workers[worker], MT_THR_JOINABLE);
> -			assert(workers[worker] > 0);
> -			workerqueue[worker] = index + 1;
> +	if ( todos[grp] )
> +		return grp;
> +
> +	todos[grp] = q_create(2048, "todo");
> +	if (todos[grp] == NULL)
> +		return -1;
> +
> +	// associate a set of workers with the pool
> +	for (i = 0; grp>= 0 && i < threads; i++){
> +		if (MT_create_thread(&worker, DFLOWworker, (void *) &todos[grp],
> MT_THR_JOINABLE) < 0) {
> +			//Can not create interpreter thread
> +			grp = -1;
> +		}
> +		if (worker == 0) {
> +			//Failed to create interpreter thread
> +			grp = -1;
>  		}
>  	}
> -	MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
> +	return grp;
>  }
>   
>  /*
> @@ -409,18 +415,28 @@ DFLOWinitialize(int index)
>   * For each instruction we keep a list of instructions whose
>   * blocking counter should be decremented upon finishing it.
>   */
> -static void
> +static str
>  DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
>  {
>  	int pc, i, j, k, l, n, etop = 0;
>  	int *assign;
>  	InstrPtr p;
>  
> +	if (flow == NULL)
> +		throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
> +	if (mb == NULL)
> +		throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
>  	PARDEBUG printf("Initialize dflow block\n");
>  	assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
> +	if (assign == NULL)
> +		throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate assign");
>  	etop = flow->stop - flow->start;
>  	for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
>  		p = getInstrPtr(mb, pc);
> +		if (p == NULL) {
> +			GDKfree(assign);
> +			throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned NULL");
> +		}
>  
>  		/* initial state, ie everything can run */
>  		flow->status[n].flow = flow;
> @@ -501,6 +517,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
>  #ifdef USE_MAL_ADMISSION
>  	memorypool = memoryclaims = 0;
>  #endif
> +	return MAL_SUCCEED;
>  }
>  
>  /*
> @@ -528,18 +545,17 @@ static void showFlowEvent(DataFlow flow,
>  */
>  
>  static str
> -DFLOWscheduler(DataFlow flow)
> +DFLOWscheduler(DataFlow flow, Queue *todo)
>  {
>  	int last;
>  	int i;
>  #ifdef USE_MAL_ADMISSION
> -	int j;
> +	//int j;
>  	InstrPtr p;
>  #endif
>  	int tasks=0, actions;
>  	str ret = MAL_SUCCEED;
>  	FlowEvent fe, f = 0;
> -	int wq;
>  
>  	if (flow == NULL)
>  		throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
> @@ -549,19 +565,19 @@ DFLOWscheduler(DataFlow flow)
>  	/* initialize the eligible statements */
>  	fe = flow->status;
>  
> -	if (fe[0].flow->cntxt->flags & timerFlag)
> -		fe[0].flow->cntxt->timer = GDKusec();
> -
>  	MT_lock_set(&flow->flowlock, "MALworker");
> -	wq = flow->cntxt->idx;
>  	for (i = 0; i < actions; i++)
>  		if (fe[i].blocks == 0) {
>  #ifdef USE_MAL_ADMISSION
>  			p = getInstrPtr(flow->mb,fe[i].pc);
> -			for (j = p->retc; j < p->argc; j++)
> -				fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk,
> fe[i].pc, j, FALSE);
> +			if (p == NULL) {
> +				MT_lock_unset(&flow->flowlock, "MALworker");
> +				throw(MAL, "dataflow", "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc)
> returned NULL");
> +			}
> +			//for (j = p->retc; j < p->argc; j++)
> +				//fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j,
> FALSE);
>  #endif
> -			q_enqueue(todo[wq], flow->status + i);
> +			q_enqueue(todo, flow->status + i);
>  			flow->status[i].state = DFLOWrunning;
>  			PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n",
>  			flow->status[i].pc, flow->status[i].argclaim);
>  		}
> @@ -571,6 +587,10 @@ DFLOWscheduler(DataFlow flow)
>  
>  	while (actions != tasks ) {
> _______________________________________________
> checkin-list mailing list
> checkin-list at monetdb.org
> http://mail.monetdb.org/mailman/listinfo/checkin-list
> 

-- 
| Stefan.Manegold at CWI.nl | DB Architectures   (DA) |
| www.CWI.nl/~manegold/  | Science Park 123 (L321) |
| +31 (0)20 592-4212     | 1098 XG Amsterdam  (NL) |




More information about the developers-list mailing list