MonetDB: Feb2013 - New thread group per dataflow block

Stefan Manegold Stefan.Manegold at cwi.nl
Fri Aug 16 10:05:52 CEST 2013


Martin,

this checkins appears to make several tests run into timeouts;
I have not yet been able to analyze why ...

A complete Mtest.py run results in

* 142 out of 1863 tests could not be executed
    1 out of 1863 tests produced slightly different output
        sql/test/BugTracker-2012/predicate_select.Bug-3090
*   6 out of 1863 tests ran into timeout
*       sql/benchmarks/ATIS/load
*       sql/benchmarks/arno/insert_ATOM
*       sql/benchmarks/arno/insert_BOND
*       sql/benchmarks/arno_flt/init
*       sql/benchmarks/moa/load
*       sql/benchmarks/wisconsin/load
    1 out of 1863 tests caused an abort (assertion failure)
        monetdb5/tests/BugTracker/kunion-and-nil.Bug-1667
    7 out of 1863 tests produced SIGNIFICANTLY different output
        monetdb5/optimizer/inlineFunction
        monetdb5/optimizer/ifthencst
        sql/test/BugTracker-2012/conditions_when_for_triggers_do_not_work.Bug-2073
        sql/test/BugTracker-2012/create_function.Bug-3172
        sql/test/BugTracker-2012/currenttime.Bug-2781
        sql/test/BugTracker-2012/null_except_null.Bug-3040
        sql/test/BugTracker-2012/day-of-month-localization.Bug-2962

while with the version before this checkin, it results in

   41 out of 1863 tests could not be executed
    1 out of 1863 tests produced slightly different output
        sql/test/BugTracker-2012/predicate_select.Bug-3090
    1 out of 1863 tests caused an abort (assertion failure)
        monetdb5/tests/BugTracker/kunion-and-nil.Bug-1667
    7 out of 1863 tests produced SIGNIFICANTLY different output
        monetdb5/optimizer/inlineFunction
        monetdb5/optimizer/ifthencst
        sql/test/BugTracker-2012/conditions_when_for_triggers_do_not_work.Bug-2073
        sql/test/BugTracker-2012/create_function.Bug-3172
        sql/test/BugTracker-2012/currenttime.Bug-2781
        sql/test/BugTracker-2012/null_except_null.Bug-3040
        sql/test/BugTracker-2012/day-of-month-localization.Bug-2962

Best,
Stefan

----- Original Message -----
> Changeset: 3669ddd28bf0 for MonetDB
> URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3669ddd28bf0
> Modified Files:
> 	monetdb5/mal/mal_dataflow.c
> Branch: Feb2013
> Log Message:
> 
> New thread group per dataflow block
> The centralized worker thread group could lead to an unacceptable
> situation. If a user is heavily processing complex queries, then
> no other user could even log into the system, for its MAL statements
> ended up at the end of the shared queues.
> 
> The problem has been resolved by introducing a thread group per dataflow
> block. This may lead to a large number of processes, whose resources
> are managed by the OS.
> 
> It solves bug 3258
> 
> 
> diffs (235 lines):
> 
> diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
> --- a/monetdb5/mal/mal_dataflow.c
> +++ b/monetdb5/mal/mal_dataflow.c
> @@ -78,12 +78,12 @@ typedef struct DATAFLOW {
>  	int *nodes;         /* dependency graph nodes */
>  	int *edges;         /* dependency graph */
>  	MT_Lock flowlock;   /* lock to protect the above */
> +	queue *todo;		/* pending instructions */
>  	queue *done;        /* instructions handled */
> +	int threads;		/* worker threads active */
> +	MT_Id workers[THREADS];
>  } *DataFlow, DataFlowRec;
>  
> -static MT_Id workers[THREADS];
> -static queue *todo = 0;	/* pending instructions */
> -
>  /*
>   * Calculate the size of the dataflow dependency graph.
>   */
> @@ -138,7 +138,6 @@ q_destroy(queue *q)
>  static void
>  q_enqueue_(queue *q, FlowEvent d)
>  {
> -	assert(d);
>  	if (q->last == q->size) {
>  		q->size <<= 1;
>  		q->data = GDKrealloc(q->data, sizeof(FlowEvent) * q->size);
> @@ -214,7 +213,6 @@ q_dequeue(queue *q)
>  	 */
>  
>  	MT_lock_unset(&q->l, "q_dequeue");
> -	assert(r);
>  	return r;
>  }
>  
> @@ -239,14 +237,15 @@ q_dequeue(queue *q)
>  static void
>  DFLOWworker(void *t)
>  {
> -	DataFlow flow;
> +	DataFlow flow = (DataFlow) t;
>  	FlowEvent fe = 0, fnxt = 0;
> -	int id = (int) ((MT_Id *) t - workers), last = 0;
> +	MT_Id id = MT_getpid();
> +	int last = 0;
>  	Thread thr;
>  	str error = 0;
>  
>  	int i;
> -	lng usec = 0;
> +	//lng usec = 0;
>  
>  	thr = THRnew("DFLOWworker");
>  
> @@ -254,8 +253,10 @@ DFLOWworker(void *t)
>  	GDKerrbuf[0] = 0;
>  	while (1) {
>  		if (fnxt == 0)
> -			fe = q_dequeue(todo);
> +			fe = q_dequeue(flow->todo);
>  		else fe = fnxt;
> +		if ( fe == 0)
> +			break;
>  		fnxt = 0;
>  		assert(fe);
>  		flow = fe->flow;
> @@ -266,20 +267,20 @@ DFLOWworker(void *t)
>  			continue;
>  		}
>  
> -		usec = GDKusec();
> +		//usec = GDKusec();
>  		/* skip all instructions when we have encontered an error */
>  		if (flow->error == 0) {
>  #ifdef USE_MAL_ADMISSION
>  			if (MALadmission(fe->argclaim, fe->hotclaim)) {
>  				fe->hotclaim = 0;   /* don't assume priority anymore */
> -				if (todo->last == 0)
> +				if (flow->todo->last == 0)
>  					MT_sleep_ms(DELAYUNIT);
> -				q_requeue(todo, fe);
> +				q_requeue(flow->todo, fe);
>  				continue;
>  			}
>  #endif
>  			error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1,
>  			flow->stk, 0, 0);
> -			PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= %d claim= " LLFMT
> "," LLFMT " %s\n",
> +			PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= "SZFMT" claim= "
> LLFMT "," LLFMT " %s\n",
>  								  fe->pc, id, fe->argclaim, fe->hotclaim, error ? error : "");
>  #ifdef USE_MAL_ADMISSION
>  			/* release the memory claim */
> @@ -330,12 +331,15 @@ DFLOWworker(void *t)
>  
>  		q_enqueue(flow->done, fe);
>  		if ( fnxt == 0) {
> -			if (todo->last == 0)
> +			if (flow->todo->last == 0)
>  				profilerHeartbeatEvent("wait");
> -			else
> -				MALresourceFairness(NULL, NULL, usec);
> +			//else
> +				//MALresourceFairness(NULL, NULL, usec);
>  		}
>  	}
> +	for( i = 0; i< flow->threads; i++)
> +	if ( flow->workers[i] == id)
> +		flow->workers[i] = 0;
>  	GDKfree(GDKerrbuf);
>  	GDKsetbuf(0);
>  	THRdel(thr);
> @@ -349,22 +353,51 @@ DFLOWworker(void *t)
>   * The workers are assembled in a local table to enable debugging.
>   */
>  static void
> -DFLOWinitialize(void)
> +DFLOWinitialize(DataFlow flow, int size)
>  {
> -	int i, limit;
> +	int i;
>  
> -	MT_lock_set(&mal_contextLock, "DFLOWinitialize");
> -	if (todo) {
> -		MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
> -		return;
> +	MT_lock_init(&flow->flowlock, "DFLOWworker");
> +	flow->todo = q_create(size);
> +	flow->done = q_create(size);
> +	flow->threads = GDKnr_threads ? GDKnr_threads :1;
> +	for (i = 0; i < flow->threads; i++){
> +		MT_create_thread(&flow->workers[i], DFLOWworker, (void *) flow,
> MT_THR_JOINABLE);
> +		/* upon failure of starting threads we reduce the count */
> +		if ( flow->workers[i]== 0){
> +			flow->threads --;
> +			i--;
> +		}
>  	}
> -	todo = q_create(2048);
> -	limit = GDKnr_threads ? GDKnr_threads : 1;
> -	for (i = 0; i < limit; i++)
> -		MT_create_thread(&workers[i], DFLOWworker, (void *) &workers[i],
> MT_THR_JOINABLE);
> -	MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
>  }
>   
> +static str
> +DFLOWfinalize(DataFlow flow)
> +{
> +	int i, cnt= flow->threads, runs =0;
> +
> +	for( i = 0; i< cnt; i++)
> +		q_enqueue(flow->todo, 0);
> +	/* time out when threads are already killed */
> +	do{
> +		runs++;
> +		cnt = 0;
> +		MT_sleep_ms(1);
> +		for( i = 0; i < flow->threads; i++)
> +			cnt += flow->workers[i] ==0;
> +	} while( cnt != flow->threads && runs <5000);
> +
> +	if ( runs == 5000)
> +		throw(MAL,"dataflow","Timeout on thread termination");
> +	GDKfree(flow->status);
> +	GDKfree(flow->edges);
> +	GDKfree(flow->nodes);
> +	q_destroy(flow->done);
> +	q_destroy(flow->todo);
> +	MT_lock_destroy(&flow->flowlock);
> +	GDKfree(flow);
> +	return MAL_SUCCEED;
> +}
>  /*
>   * The dataflow administration is based on administration of
>   * how many variables are still missing before it can be executed.
> @@ -518,7 +551,7 @@ DFLOWscheduler(DataFlow flow)
>  			for (j = p->retc; j < p->argc; j++)
>  				fe[i].argclaim = getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk,
>  				fe[i].pc, j, FALSE);
>  #endif
> -			q_enqueue(todo, flow->status + i);
> +			q_enqueue(flow->todo, flow->status + i);
>  			flow->status[i].state = DFLOWrunning;
>  			PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=" LLFMT "\n",
>  			flow->status[i].pc, flow->status[i].argclaim);
>  		}
> @@ -543,7 +576,7 @@ DFLOWscheduler(DataFlow flow)
>  				if (flow->status[i].blocks == 1 ) {
>  					flow->status[i].state = DFLOWrunning;
>  					flow->status[i].blocks--;
> -					q_enqueue(todo, flow->status + i);
> +					q_enqueue(flow->todo, flow->status + i);
>  					PARDEBUG
>  					mnstr_printf(GDKstdout, "#enqueue pc=%d claim= " LLFMT "\n",
>  					flow->status[i].pc, flow->status[i].argclaim);
>  				} else {
> @@ -579,13 +612,11 @@ runMALdataflow(Client cntxt, MalBlkPtr m
>  
>  	assert(stoppc > startpc);
>  
> -	/* check existence of workers */
> -	if (workers[0] == 0)
> -		DFLOWinitialize();
> -	assert(workers[0]);
> -	assert(todo);
> -
>  	flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
> +
> +	DFLOWinitialize(flow, stoppc- startpc +1);
> +	assert(flow->todo);
> +	assert(flow->done);
>  
>  	flow->cntxt = cntxt;
>  	flow->mb = mb;
> @@ -596,9 +627,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m
>  	flow->start = startpc + 1;
>  	flow->stop = stoppc;
>  
> -	MT_lock_init(&flow->flowlock, "DFLOWworker");
> -	flow->done = q_create(stoppc- startpc+1);
> -
>  	flow->status = (FlowEvent)GDKzalloc((stoppc - startpc + 1) *
>  	sizeof(FlowEventRec));
>  	size = DFLOWgraphSize(mb, startpc, stoppc);
>  	size += stoppc - startpc;
> @@ -608,11 +636,9 @@ runMALdataflow(Client cntxt, MalBlkPtr m
>  
>  	ret = DFLOWscheduler(flow);
>  
> -	GDKfree(flow->status);
> -	GDKfree(flow->edges);
> -	GDKfree(flow->nodes);
> -	q_destroy(flow->done);
> -	MT_lock_destroy(&flow->flowlock);
> -	GDKfree(flow);
> +	if( ret == MAL_SUCCEED)
> +		ret = DFLOWfinalize(flow);
> +	else (void)
> +		DFLOWfinalize(flow);
>  	return ret;
>  }
> _______________________________________________
> checkin-list mailing list
> checkin-list at monetdb.org
> http://mail.monetdb.org/mailman/listinfo/checkin-list
> 

-- 
| Stefan.Manegold at CWI.nl | DB Architectures   (DA) |
| www.CWI.nl/~manegold/  | Science Park 123 (L321) |
| +31 (0)20 592-4212     | 1098 XG Amsterdam  (NL) |




More information about the developers-list mailing list