[Monetdb-developers] [Monetdb-checkins] MonetDB5/src/scheduler run_octopus.mx, , 1.17, 1.18

Romulo Goncalves R.A.Goncalves at cwi.nl
Tue Apr 14 13:52:50 CEST 2009


It was this check in who broke the compilation in SQL.
Martin forgot to remove the calls for
OCTOPUSdrop

 From sql/src/backend/monet5/sql.mx

Romulo

Martin Kersten wrote:
> Update of /cvsroot/monetdb/MonetDB5/src/scheduler
> In directory 23jxhf1.ch3.sourceforge.com:/tmp/cvs-serv21275
> 
> Modified Files:
> 	run_octopus.mx 
> Log Message:
> A new round for the octopus scheduler. Code still in testing phase.
> 
> 
> U run_octopus.mx
> Index: run_octopus.mx
> ===================================================================
> RCS file: /cvsroot/monetdb/MonetDB5/src/scheduler/run_octopus.mx,v
> retrieving revision 1.17
> retrieving revision 1.18
> diff -u -d -r1.17 -r1.18
> --- run_octopus.mx	29 Jan 2009 18:17:10 -0000	1.17
> +++ run_octopus.mx	13 Apr 2009 14:23:01 -0000	1.18
> @@ -30,17 +30,17 @@
>  re-directing requests to multiple sites. If there are no sites known,
>  then the code is executed linearly as is.
>  
> -The scheduler runs all tentacles asynchronously.
> +The scheduler runs all tentacles asynchronously if possible.
>  To make our live easier, we assume that all tentacles are
>  grouped together in a guarded block as follows:
>  
>  @verbatim
> -barrier (parallel,a):= scheduler.octopus(timeout);
> -a:= octopus.tentacle_1();
> +barrier (parallel,version):= scheduler.octopus(timeout);
> +a:= octopus.tentacle_1(sitename,fcnname,version);
>  ...
> -b:= octopus.tentacle_n();
> -a:= mat.pack(a,...,b);
> -exit (parallel,a);
> +b:= octopus.tentacle_n(sitename,fcnname,version);
> +exit (parallel,version);
> +z:= mat.pack(a,...,b);
>  @end verbatim
>  
>  This way the MAL flow of control simplifies skipping to the end
> @@ -50,27 +50,17 @@
>  Allowing MAL instructions inbetween complicates our work,
>  because it would mean that we have to do a flow analysis.
>  
> -To make this work the scheduler needs a list of database worker.
> -For the time being, this is an explicitly administered list here. 
> -When the octopus scheduling is called, we check the connection with
> -the remote site. If it is down, it is re-activated using Merovingian.
> -
> +To make this work the scheduler needs a list of databases to play with.
> +For the time being this consists of all the database known
> +and ending with the phrase 'sea'.
> +This list is obtained through the remote module using the
> +support of Merovingian. The default is to use the local
> +database as a target.
>  @{
>  @mal
> -pattern scheduler.octopus(timeout:int)(:bit, :bat[:any_1,:any_2])
> +pattern scheduler.octopus(t:int)(:bit,version:int)
>  address OCTOPUSrun
> -comment "Run the program block in parallel, but don't wait longer then t seconds";
> -
> -pattern scheduler.worker(dbnme:str, usr:str, pw:str)
> -address OCTOPUSworker
> -comment "Add a new worker to the known list ";
> -pattern scheduler.worker(dbnme:str, usr:str, pw:str, host:str, port:int)
> -address OCTOPUSworker
> -comment "Add a worker site to the known list ";
> -
> -pattern scheduler.drop(dbnme:str)
> -address OCTOPUSdrop
> -comment "Remove a worker from the list";
> +comment "Run the program block in parallel, but don't wait longer then t seconds. Also fix a consistent database version.";
>  @h
>  #ifndef _RUN_OCTOPUS
>  #define _RUN_OCTOPUS
> @@ -78,7 +68,7 @@
>  #include "mal_instruction.h"
>  #include "mal_client.h"
>  
> -/*#define DEBUG_RUN_OCTOPUS 		to trace processing */
> +#define DEBUG_RUN_OCTOPUS 		/* to trace processing */
>  
>  #ifdef WIN32
>  #ifndef LIBRUN_OCTOPUS
> @@ -91,8 +81,6 @@
>  #endif
>  
>  octopus_export str OCTOPUSrun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
> -octopus_export str OCTOPUSworker(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
> -octopus_export str OCTOPUSdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
>  #endif /* MAL_RUN_OCTOPUS */
>  
>  @+ Octopus scheduling implementation
> @@ -108,155 +96,156 @@
>  #include "remote.h"
>  #include "alarm.h"
>  
> -#define SITEasleep	0
> -#define SITElocal	1
> -#define SITEremote	2
> +typedef struct REGMAL{
> +	str fcn;
> +	struct REGMAL *nxt;
> +} *Registry;
>  
>  typedef struct {
> -	str alias;
> -	str db;	/* connection parameters */
> +	str name;
>  	str usr;
> -	str pw;
> -	str host;	/* used when merovigian is not running */
> -	int port;
> -	int status;	/* asleep, local, remote */
> -} Site;
> +	str pwd;
> +	Registry nxt;	/* list of registered queries */
> +} Sea;
>  
>  #define MAXSITES 2048	/* should become dynamic at some point */
> -static Site *sites;
> -static int nrsites = 0;
> +static Sea sea[MAXSITES];
> +static int nrsea = 0;
>  
>  static str
>  OCTOPUSdiscover(Client cntxt){
>  	bat b1 = 0, b2 = 0;
> -	BAT *b;
> +	BAT *l1, *l2;
> +	BUN p,q;
>  	str msg = MAL_SUCCEED;
> +	BATiter bi;
>  
> -	(void) cntxt;
> -	(void) b2;
> +	sea[nrsea].usr = GDKstrdup("monetdb");
> +	sea[nrsea].pwd = GDKstrdup("monetdb");
> +	sea[nrsea++].name= GDKstrdup(GDKgetenv("gdk_dbname"));
>  	/* determine if sites are reachable */
>  	msg = RMTgetList(&b1,&b2);
>  	if ( msg != MAL_SUCCEED)
>  		return msg;
> -	b = BATdescriptor(b1);
> -	if ( b == NULL)
> +	l1 = BATdescriptor(b1);
> +	if ( l1 == NULL)
>  		throw(MAL,"octopus.discover","No database list available");
> -	BBPunfix(b1);
> +	l2 = BATdescriptor(b2);
> +	if ( l2 == NULL){
> +		BBPreleaseref(b1);
> +		throw(MAL,"octopus.discover","No database list available");
> +	}
> +	/* add the databases to the working set */
> +	bi= bat_iterator(l1);
> +	BATloop(l1,p,q){
> +		str t= (str) BUNtail(bi,p);
> +
> +		if (nrsea ==MAXSITES) break;
> +		if (strlen(t) >= 3 && strcmp("sea", t+strlen(t)-3) == 0){
> +			sea[nrsea].usr = GDKstrdup("monetdb");
> +			sea[nrsea].pwd = GDKstrdup("monetdb");
> +			sea[nrsea++].name= GDKstrdup(t);
> +#ifdef DEBUG_RUN_OCTOPUS
> +		stream_printf(cntxt->fdout,"Found site %s\n",t);
> +#else
> +		(void) cntxt;
> +#endif
> +		}
> +	}
> +#ifdef DEBUG_RUN_OCTOPUS
> +		stream_printf(cntxt->fdout,"Seas %d\n",nrsea);
> +#endif
> +	BBPreleaseref(b1);
> +	BBPreleaseref(b2);
>  	return MAL_SUCCEED;
>  }
>  
>  @-
> -The replica is identified by database name. The host and port
> -should address a merovingian to ensure the database instance is
> -started. The default is to contact the local merovingian at 
> -default port 50000.
> +We first register the tentacle at all sites and keep
> +a list of those already sent.
>  @c
> -str
> -OCTOPUSworker(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
> +static int
> +OCTOPUSfind(Sea s, str qry){
> +	Registry r;
> +	for ( r= s.nxt; r; r= r->nxt)
> +	if ( strcmp(qry, r->fcn)==0)
> +		return 1;
> +	return 0;
> +}
> +
> + at -
> +The functions called by the octopus.exec_qry are to be
> +registered at all sites.
> + at c
> +static str
> +OCTOPUSregister(Client cntxt, MalBlkPtr mb, InstrPtr p)
>  {
> -	int idx;
> +	int i;
> +	str conn, fname, msg = MAL_SUCCEED;
> +
> +	fname = getVarConstant(mb,getArg(p,2)).val.sval;
> +	for ( i= 0; i< nrsea; i++){
> +		msg= RMTconnect(&conn, &sea[i].name, &sea[i].usr, &sea[i].pwd);
> +		if (msg ){
> +			stream_printf(cntxt->fdout,"!%s\n",msg);
> +			GDKfree(msg);
> +			msg = NULL;
> +			continue;
> +		}
> +		if( !OCTOPUSfind(sea[i], fname) ){
> +			msg = RMTregisterInternal(cntxt, conn, octopusRef, fname);
>  
> -	(void) mb;
> -	if (nrsites == MAXSITES)
> -		throw(MAL,"scheduler.worker","Too many worker");
> -	mal_set_lock(mal_contextLock,"scheduler.worker");
> -	if (nrsites == 0)
> -		sites = (Site *) GDKzalloc(sizeof(Site) * MAXSITES);
> -	idx = nrsites++;
> -	sites[idx].alias = NULL;
> -	sites[idx].db = GDKstrdup(*(str*) getArgReference(stk,pci,1));
> -	sites[idx].usr = GDKstrdup(*(str*) getArgReference(stk,pci,2));
> -	sites[idx].pw = GDKstrdup(*(str*) getArgReference(stk,pci,3));
> -	if (pci->argc > 4){
> -		sites[idx].host = GDKstrdup(*(str*) getArgReference(stk,pci,4));
> -		sites[idx].port = *(int*) getArgReference(stk,pci,5);
> -	} else {
> -		sites[idx].host = GDKstrdup("localhost");
> -		sites[idx].port = 50000;
> -	}
> -	mal_unset_lock(mal_contextLock,"scheduler.worker");
>  #ifdef DEBUG_RUN_OCTOPUS
> -	stream_printf(cntxt->fdout,"# added worker %s %s %s %s\n", 
> -		sites[idx].alias, sites[idx].usr, sites[idx].pw);
> -		sites[idx].db, sites[idx].usr, sites[idx].pw);
> +			stream_printf(cntxt->fdout,"octopus.%s registered at site %s\n",
> +				fname,sea[i].name);
> +			stream_printf(cntxt->fdout,"reply: %s\n",msg?msg:"ok");
>  #else
> -	(void) cntxt;
> +			(void) cntxt;
>  #endif
> -	return MAL_SUCCEED;
> -}
> -str
> -OCTOPUSdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
> -{
> -	int i,j;
> -	str alias = *(str*) getArgReference(stk,pci,1);
> -
> -	(void) cntxt;
> -	(void) mb;
> -	mal_set_lock(mal_contextLock,"scheduler.drop");
> -	for (i=j=0; i<nrsites; i++){
> -		if( strcmp(sites[i].alias, alias) ==0) {
> -			GDKfree(sites[i].alias);
> -			GDKfree(sites[i].db);
> -			GDKfree(sites[i].usr);
> -			GDKfree(sites[i].pw);
> -			GDKfree(sites[i].host);
> -			continue;
> +			if ( msg == MAL_SUCCEED){
> +				Registry r= (Registry) GDKzalloc(sizeof(struct REGMAL));
> +				r->fcn = GDKstrdup(getFunctionId(p));
> +				r->nxt = sea[i].nxt;
> +				sea[i].nxt = r;
> +			}
>  		}
> -		sites[j++] = sites[i];
>  	}
> -	nrsites = j;
> -	mal_unset_lock(mal_contextLock,"scheduler.drop");
> -	if ( i == j )
> -		throw(MAL,"scheduler.drop","Site not found");
> -	return MAL_SUCCEED;
> +	GDKfree(conn);
> +	return msg;
>  }
>  @-
> -The policy to check for sites is a multiphase phase process.
> -First, we try to re-use a site where the operation was ran before.
> -If not available, we select a non-used worker.
> -If all this fails, we pick a random site to execute the plan.
> +The work division looks at the system opportunities and
> +replaces the target site in all instructions.
> +The first policy is to simply perform round robin.
> +The more advanced way is to negotiat with the remote sites.
>  @c
>  static str
> -OCTOPUSexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
> +OCTOPUSworkdivision(Client cntxt, MalBlkPtr mb, int pc)
>  {
> -	int i=0, tries= nrsites * 2;
> +	static int rr=0;
>  	str msg = MAL_SUCCEED;
> +	InstrPtr p;
> +	ValPtr cst;
>  
> -redo:
> +	for (; pc< mb->stop; pc++){
> +		if ( nrsea >1 && rr == 0) rr++; /* ignore default */
> +		p= getInstrPtr(mb,pc);
> +		if ( p->barrier == EXITsymbol)
> +			break;
> +		assert( isVarConstant(mb, getArg(p,1)) );
> +		cst = &getVarConstant(mb, getArg(p,1));
> +		if( cst->val.sval)
> +			GDKfree(cst->val.sval);
> +		cst->val.sval= GDKstrdup(sea[rr].name);
> +		cst->len = strlen(cst->val.sval);
>  #ifdef DEBUG_RUN_OCTOPUS
> -	stream_printf(cntxt->fdout,"octopus.exec site selected %d\n",i);
> +		stream_printf(cntxt->fdout,"octopus site selected %s\n",sea[rr].name);
> +		printInstruction(cntxt->fdout,mb,0,p,LIST_MAL_STMT);
>  #else
> -	(void) cntxt;
> -#endif
> -
> -	/* register the plan remotely */
> -	msg = RMTregisterInternal(cntxt, sites[i].alias, 
> -		getModuleId(pci), getFunctionId(pci));
> -
> -	/* ignore a duplicate definition */
> -	if (msg != MAL_SUCCEED && !strstr(msg,"Function already defined")){
> -#ifdef DEBUG_RUN_OCTOPUS
> -		stream_printf(cntxt->fdout,"octopus.exec failed to register plan %s.%s at site %s\n",getModuleId(pci),getFunctionId(pci),sites[i].alias);
> -		stream_printf(cntxt->fdout,"reply: %s\n",msg);
> -#endif
> -		if (--tries <= 0)
> -			return msg;
> -		goto redo;
> -	}
> -
> -	/* execute the plan as an independent process thread if it is local*/
> -	/* otherwise activate it on the remote site passing parameters as well */
> -	msg =runMALprocess(cntxt,mb,stk, getPC(mb,pci), getPC(mb,pci)+1);
> -	if ( msg != MAL_SUCCEED){
> -#ifdef DEBUG_RUN_OCTOPUS
> -		stream_printf(cntxt->fdout,"octopus.exec failed to run remote plan\n");
> +		(void) cntxt;
>  #endif
> -		if (--tries <= 0)
> -			return msg;
> -		goto redo;
> +		rr= (rr+1) % nrsea;
>  	}
> -
> -	/* if it fails, we need to find another site */
>  	return msg;
>  }
>  @-
> @@ -270,37 +259,52 @@
>  We should be careful in accessing a site that runs out
>  of clients or any failure. It may cause the scheduler to
>  wait forever.
> +
> +The database version should indicate to the tentacles
> +if it is time to refresh their caches. 
> +It should be obtained from the recycler where we
> +know when updates have been taken place.
>  @c
>  str
>  OCTOPUSrun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p)
>  {
>  	int *res = (int*) getArgReference(stk,p,0);
> +	int *version = (int*) getArgReference(stk,p,1);
>  	int timeout = *(int*) getArgReference(stk,p,2);
> -	int j,fnd, i = getPC(mb,p);
> +	int j,fnd, i = getPC(mb,p), threadcnt=0;
>  	str msg = MAL_SUCCEED;
>  	*res = 0;	/* skip the block */
>  
> -	if ( OCTOPUSdiscover(cntxt) == 0 ){
> +	*version = 0;
> +
> +	if ( (msg= OCTOPUSdiscover(cntxt))  ){
>  #ifdef DEBUG_RUN_OCTOPUS
>  		stream_printf(cntxt->fdout,"#Run in local serial mode\n");
>  #endif
>  		*res = 1;
> -		return MAL_SUCCEED; 
> +		return msg; 
> +	}
> + at -
> +Register the tentacle functions at all sites.
> + at c
> +	if (nrsea > 1) {
> +		for (j= i+1; j<mb->stop ; j++){
> +			p= getInstrPtr(mb,j);
> +			if ( p->barrier == EXITsymbol )
> +				break;
> +			msg= OCTOPUSregister(cntxt,mb,p);
> +			if ( msg )
> +				return msg;
> +		}
>  	}
> +	msg= OCTOPUSworkdivision(cntxt,mb,i+1);
> +	if ( msg )
> +		return msg;
> +
>  	/* do the actual parallel work */
>  	for (i++; i<mb->stop && msg == MAL_SUCCEED; i++){
>  		p= getInstrPtr(mb,i);
> -		/* don't do it remote if we need arguments */
> -		if ( p->retc != p->argc){
> -#ifdef DEBUG_RUN_OCTOPUS
> -		stream_printf(cntxt->fdout,"#Run in local serial mode due to arguments\n");
> -#endif
> -			*res = 1;
> -			return MAL_SUCCEED; 
> -		}
> -		if ( p->barrier == EXITsymbol )
> -			break;
> -		if ( getModuleId(p) == matRef && getFunctionId(p) == packRef){
> +		if ( p->barrier == EXITsymbol ){
>  			/* collect the results */
>  			do{
>  				fnd = 0;
> @@ -311,10 +315,10 @@
>  #endif
>  				MT_sleep_ms(1000);
>  				timeout--;
> -			} while ( fnd < p->argc-3 && timeout > 0 );
> +			} while ( fnd < threadcnt && timeout > 0 );
>  			if (timeout <= 0)
> -				throw(MAL,"scheduler.pack","Execution time out");
> -			return MATpackInternal(stk,p,1);
> +				throw(MAL,"scheduler.octopus","Execution time out");
> +			break;
>  		}
>  		if ( getModuleId(p) != octopusRef)
>  			throw(MAL,"scheduler.octopus","tentacle expected");
> @@ -323,7 +327,8 @@
>  #else
>  	(void) cntxt;
>  #endif
> -		msg = OCTOPUSexec(cntxt,mb,stk,p);
> +		msg =runMALprocess(cntxt,mb,stk, getPC(mb,p), getPC(mb,p)+1);
> +		threadcnt++;
>  	}
>  	return msg; 
>  }
> 
> 
> ------------------------------------------------------------------------------
> This SF.net email is sponsored by:
> High Quality Requirements in a Collaborative Environment.
> Download a free trial of Rational Requirements Composer Now!
> http://p.sf.net/sfu/www-ibm-com
> _______________________________________________
> Monetdb-checkins mailing list
> Monetdb-checkins at lists.sourceforge.net
> https://lists.sourceforge.net/lists/listinfo/monetdb-checkins




More information about the developers-list mailing list