[Monetdb-developers] [Monetdb-checkins] MonetDB5/src/modules/mal bpm.mx, 1.23, 1.24

Romulo Goncalves R.A.Goncalves at cwi.nl
Tue Feb 6 15:24:46 CET 2007


Martin Kersten wrote:
> Update of /cvsroot/monetdb/MonetDB5/src/modules/mal
> In directory sc8-pr-cvs7.sourceforge.net:/tmp/cvs-serv29523
> 
> Modified Files:
> 	bpm.mx 
> Log Message:
> 70% of the code needed to handle partitioned BATs in a single server.
> The code is not tested yet (except for compilation errors), but should
> give a head start for those in-experienced in working with MonetDB source code.
> 
> 
> Index: bpm.mx
> ===================================================================
> RCS file: /cvsroot/monetdb/MonetDB5/src/modules/mal/bpm.mx,v
> retrieving revision 1.23
> retrieving revision 1.24
> diff -u -d -r1.23 -r1.24
> --- bpm.mx	4 Feb 2007 22:55:32 -0000	1.23
> +++ bpm.mx	6 Feb 2007 07:16:13 -0000	1.24
> @@ -229,7 +229,7 @@
>  address BPMdestroy
>  comment "Destroy the BAT partition box";
>  
> -command deposit(nme:str,b:bat[:oid,:any_2]) :void
> +command deposit(nme:str,b:bat[:oid,:any_2]) :bat[:oid,:any_2]
>  address BPMdeposit
>  comment "Create a new partitioned BAT by name";
>  
> @@ -238,36 +238,36 @@
>  If the alias BAT denotes an existing partition, it is
>  further broken into pieces.
>  @mal
> -command rangePartition(pb:bat[:oid,:any_2], 
> -		ll:oid, lh:oid, rl:any_2, rh:any_2):void
> +command rangePartition(pb:bat[:any_1,:any_2], rl:any_2, rh:any_2):void
>  address BPMrange
>  comment "Create a range partition on a BAT";
> -command rangePartition(pb:bat[:oid,:any_2], pv:bat[:oid,:any_2]):void
> +command rangePartition(pb:bat[:any_1,:any_2], pv:bat[:oid,:any_2]):void
>  address BPMrangeVector
>  comment "Create the partitions based on a range vector";
>  
> -command hashPartition(pb:bat[:oid,:any_2], slots:int):void
> +command hashPartition(pb:bat[:any_1,:any_2], slots:int):void
>  address BPMhash
>  comment "Create a hash partition on a BAT";
> -command derivePartition(pb:bat[:oid,:any_2], src:bat[:oid,:any_2]):bat[:oid,:any_2]
> +command derivePartition(pb:bat[:any_1,:any_2], 
> +	src:bat[:any_1,:any_2]):bat[:any_1,:any_2]
>  address BPMderived
>  comment "Create a derived fragmentation over the head using src.";
>  
> -command take(pb:str, b:bat[:oid,:any_2]):bat[:oid,:any_2]
> +command take(pb:str, b:bat[:any_1,:any_2]):bat[:any_1,:any_2]
>  address BPMtake
>  comment "Retrieve the alias for a partitioned BAT";
> -command take(alias:bat[:oid,:any_2],index:int) :bat[:oid,:any_2]
> +command take(alias:bat[:any_1,:any_2],index:int) :bat[:any_1,:any_2]
>  address BPMtakePartition
>  comment "Retrieve a single component of a partitioned BAT by index";
>  
> -command insert(pb:bat[:oid,:any_2],b:bat[:oid,:any_2]) :void
> +command insert(pb:bat[:any_1,:any_2],b:bat[:any_1,:any_2]) :void
>  address BPMinsert
>  comment "Insert elements into the BAT partitions";
> -command delete(pb:bat[:oid,:any_2],b:bat[:oid,:any_2]) :void
> +command delete(pb:bat[:any_1,:any_2],b:bat[:any_1,:any_2]) :void
>  address BPMdelete
>  comment "Delete elements from the BAT partitions";
> -command replace(pb:bat[:oid,:any_2],old:bat[:oid,:any_2],
> -		nwe:bat[:oid,:any_2]) :void
> +command replace(pb:bat[:any_1,:any_2],old:bat[:any_1,:any_2],
> +		nwe:bat[:any_1,:any_2]) :void
>  address BPMreplace
>  comment "Replace the content of the BAT partitions";
>  
> @@ -275,7 +275,7 @@
>  address BPMgetNames
>  comment "Retrieve the names of all known partitioned BATs";
>  
> -command discard(alias:bat[:oid,:any_2]) :void
> +command discard(alias:bat[:any_1,:any_2]) :void
>  address BPMdiscard
>  comment "Release a partitioned BAT from the box";
>  
> @@ -285,39 +285,39 @@
>  Wherever possible skipping elements that don't qualify
>  the bounds given for the head.
>  @mal
> -command newIterator(grp:bat[:oid,:any_2]):bat[:oid,:any_2]
> +command newIterator(grp:bat[:any_1,:any_2]):bat[:any_1,:any_2]
>  address BPMnewIterator
>  comment "Create an iterator over the BAT partitions.";
>  
> -command newIterator(grp:bat[:oid,:any_2],first:any_2,last:any_2)
> -		:bat[:oid,:any_2]
> +command newIterator(grp:bat[:any_1,:any_2],first:any_2,last:any_2)
> +		:bat[:any_1,:any_2]
>  address BPMnewIteratorRng
>  comment "Create an iterator over the BAT partitions.";
>  
> -command newIterator(pb:bat[:oid,:any_2], first:oid,last:oid,
> +command newIterator(pb:bat[:any_1,:any_2], first:any_1,last:any_1,
>  		vlow:any_2, vhgh:any_2) :bat[:oid,:any_2]
>  address BPMnewIteratorRng4
>  comment "Create an iterator over the BAT partitions.";
>  
> -command hasMoreElements(grp:bat[:oid,:any_2]) :bat[:oid,:any_2]
> +command hasMoreElements(grp:bat[:any_1,:any_2]) :bat[:any_1,:any_2]
>  address BPMhasMoreElements
>  comment "Localize the next partition for processing.";
>  
> -command hasMoreElements(pb:bat[:oid,:any_2], 
> -		low:any_2, hgh:any_2) :bat[:oid,:any_2]
> +command hasMoreElements(pb:bat[:any_1,:any_2], 
> +		low:any_2, hgh:any_2) :bat[:any_1,:any_2]
>  address BPMhasMoreElementsRng2
>  comment "Localize the next partition for processing.";
> -command hasMoreElements(pb:bat[:oid,:any_2], first:oid,last:oid,
> -		vlow:any_2, vhgh:any_2) :bat[:oid,:any_2]
> +command hasMoreElements(pb:bat[:any_1,:any_2], first:any_1,last:any_1,
> +		vlow:any_2, vhgh:any_2) :bat[:any_1,:any_2]
>  address BPMhasMoreElementsRng4
>  comment "Localize the next partition for processing.";
>  
> -command getDimension(b:bat[:oid,:any_2])(first:oid,last:oid, 
> +command getDimension(b:bat[:any_1,:any_2])(first:any_1,last:any_1, 
>  	vlow:any_2, vhgh:any_2)
>  address BPMgetDimension
>  comment "Obtain the partition boundary values.";
>  
> -command dump(alias:bat[:oid,:any_2])
> +command dump(alias:bat[:any_1,:any_2])
>  address BPMdumpAlias
>  comment "Give the details of the partition tree";
>  command dump()
> @@ -403,6 +403,7 @@
>  @c
>  #include "mal_config.h"
>  #include "bpm.h"
> +#include "bat5.h"
>  
>  @-
>  Every partition is the result of at most four operations:
> @@ -430,7 +431,6 @@
>  	int nxt, prv;	/* list of all partitions*/
>  } *Partition, PartitionRec;
>  
> -/* NOT USED YET
>  static void BPMprintRecord(stream *f, Partition p){
>  	stream_printf(f,"partition: %s alias %d bid %d index %d ",
>  		p->name, p->alias, p->bid, p->index);
> @@ -438,7 +438,6 @@
>  	stream_printf(f,"hbucket %d tbucket %d ", p->hbucket,p->tbucket);
>  	stream_printf(f,"nxt %d prv %d\n", p->nxt,p->prv);
>  }
> -*/
>  @-
>  The number of partitioned BATs is considered low and
>  a straight forward array with linear search seems
> @@ -470,7 +469,6 @@
>  		}
>  	return 0;
>  }
> -/* NOT USED YET
>  static Partition getPartition(int bid, int idx){
>  	return bpmcat+getPartitionIndex(bid,idx);
>  }
> @@ -478,6 +476,7 @@
>  	return getPartition(bid,0);
>  }
>  
> +/* NOT USED YET
>  static void delAlias(int bid){
>  	int i;
>  	i= getPartitionIndex(bid,0);
> @@ -595,33 +594,121 @@
>  
>  	BBPkeepref(*ret= bn->batCacheid);
>  	BBPunfix(b->batCacheid);
> -	throw(MAL, "bpm.deposit","NYI");
> +	return MAL_SUCCEED;
>  }
> -
> + at -
> +Range partitioning simply runs through all partitions
> +and creates new ones, keeping the partitions in order.
> +The 'critical' part is to detect overlap of the range
> +and the bounds of the partition.
> + at c
>  str
>  BPMrange(int *ret, int *bid, ptr *low, ptr *hgh){
>  	BAT *b;
> +	int i, tpe;
> +	int (*cmp) (ptr, ptr);
> +	ptr nilptr;
> +	int low_nil, hgh_nil;
> +	Partition p;
> +
>  	BPMopen();
> -	@:getBATdescriptor(bid,b,"bpm.range")@
> -	/* determine the partitioning scheme */
> -	(void) b;
> -	(void) low;
> -	(void) hgh;
> +
> +	p= getAlias(*bid);
> +	if( p==0){
> +		throw(MAL, "bpm.range","Partition not known");
> +	}
> +	i= p->index;
> +	@:getBATdescriptor(&bpmcat[i].bid,b,"bpm.range")@
> +	if( b == 0)
> +		throw(MAL,"bpm.range","Could not access BAT");
> +
> +
> +	/* get the comparison function */
> +	tpe= *bid >0? b->htype:b->ttype;
> +	cmp= BATatoms[tpe].atomCmp;
> +	nilptr = ATOMnilptr(tpe);
> +	low_nil = ((*cmp) (*low, nilptr) == 0);
> +	hgh_nil = ((*cmp) (*hgh, nilptr) == 0);
> +
> +	BBPunfix(b->batCacheid);	/* don't need it anymore */
> +
> +	for( ; i ; i= bpmcat[i].nxt){
> +		@:getBATdescriptor(&bpmcat[i].bid,b,"bpm.range")@
> +
> +		/* determine overlag by excluding outliers */
> +		if( *bid > 0 ){
> +			if( (*cmp) ((ptr) &bpmcat[i].thgh, *low) < 0 ||
> +				(*cmp) ((ptr) &bpmcat[i].tlow, *hgh) > 0 
> +			){
> +				/* this fragment need not be split */
> +			} else {
> +				/* break the fragment */
> +#ifdef _DEBUG_BPM_
> +				stream_printf(GDKout, "break fragment %d\n",i);
> +#endif
> +			}
> +		} else {
> +			/* use reversed bat */
> +			if( (*cmp) ((ptr) &bpmcat[i].hhgh, *low) < 0 ||
> +				(*cmp) ((ptr) &bpmcat[i].hlow, *hgh) > 0 
> +			){
> +				/* this fragment need not be split */
> +			} else {
> +#ifdef _DEBUG_BPM_
> +				stream_printf(GDKout, "break fragment %d\n",i);
> +#endif
> +				/* break the fragment */
> +			}
> +		}
> +		BBPunfix(b->batCacheid);
> +	}
>  	*ret= 0;
> -	throw(MAL, "bpm.range","NYI");
> +	return MAL_SUCCEED;
>  }
> + at -
> +The vector approach could either use a single column
> +of ordered split points, or a double column with (low,hgh)
> +value pairs. (STILL TO BE DECIDED)
> +We should ensure that the range vector is sorted first.
> + at c
>  str
>  BPMrangeVector(int *ret, int *bid, int *pv)
>  {
> -	BAT *b;
> +	BAT *b, *bpv;
> +	BUN p,q;
> +	/* ptr ph=0; */
> +
>  	BPMopen();
>  	@:getBATdescriptor(bid,b,"bpm.range")@
> -	/* determine the partitioning scheme */
> -	(void) b;
> -	(void) bid;
> -	(void) pv;
> -	*ret= 0;
> -	throw(MAL, "bpm.range","NYI");
> +	if( b == 0)
> +		throw(MAL,"bpm.range","Cannot access BAT");
> +
> +	@:getBATdescriptor(pv,bpv,"bpm.range")@
> +	if( bpv == 0){
> +		BBPunfix(b->batCacheid);
> +		throw(MAL,"bpm.range","Cannot access BAT");
> +	}
> +	/* Apply the range partition using pairs */
> +	BATloop(b,p,q){
> +		ptr ph= BUNhead(b,p);
> +		ptr pt= BUNtail(b,p);
> +		BPMrange(ret, bid, &ph, &pt);
> +	}
> +	/* Apply the range partition using single column
> +	BATloop(b,p,q){
> +		if( ph){
> +			ph= BUNtail(b,p);
> +		} else {
> +			ptr pt= BUNtail(b,p);
> +			BPMrange(ret, bid,&ph,&pt);
> +			ph= BUNtail(b,p);
> +		}
> +	}
> +	 */
> +
> +	BBPunfix(b->batCacheid);
> +	BBPunfix(bpv->batCacheid);
> +	return MAL_SUCCEED;
>  }
>  str
>  BPMhash(int *ret, int *bid, int *slots, int *prime)
> @@ -654,77 +741,140 @@
>  str
>  BPMtake(int *ret, str *nme)
>  {
> +	int i;
>  	BPMopen();
> -	(void) nme;
> -	*ret =0;
> -	throw(MAL, "bpm.take","NYI");
> +	i= getPartitionName(*nme);
> +	if( i== 0){
> +		throw(MAL,"pbm.take","Partitioned BAT does not exist");
> +	}
> +	*ret = bpmcat[i].alias;
> +	return MAL_SUCCEED;
>  }
>  str
>  BPMtakePartition(int *ret, int *bid, int *index)
>  {
> +	BAT *b;
> +	int i;
> +
>  	BPMopen();
> -	(void) bid;
> -	(void) index;
> -	*ret =0;
> -	throw(MAL, "bpm.take","NYI");
> +	i= getPartitionIndex(*bid, *index);
> +	if( i== 0){
> +		throw(MAL,"pbm.take","Partitioned BAT does not exist");
> +	}
> +	@:getBATdescriptor(bid,b,"bpm.take")@
> +	if( b == 0){
> +		throw(MAL,"bpm.take","Cannot access BAT");
> +	}
> +	BBPkeepref(*ret =b->batCacheid);
> +	return MAL_SUCCEED;
>  }
>  @- Updates
> +The update operations simply loop through the partitions
> +and apply the results. We can optimize this by inspection
> +of the range tables, but that is considered relevant in a distributed
> +case.
> +The updates are merely convenient operators, because ideally
> +the optimizer already filters out the partitions of interest.
>  @c
>  str
>  BPMinsert(int *ret, int *bid, int *ins)
>  {
> -	BAT *b;
> +	int i;
> +	str msg= MAL_SUCCEED;
> +	Partition p;
> +
>  	BPMopen();
> -	@:getBATdescriptor(bid,b,"bpm.insert")@
> +	p= getAlias(*bid);
> +	if( p== 0)
> +		throw(MAL,"bpm.insert","Can not alias BAT");
>  	/* distributed the content */
> -	(void) b;
> -	(void) ins;
> +	for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
> +		msg=BKCinsert_bat(ret,&bpmcat[i].bid,ins);
> +	}
>  	*ret= 0;
> -	throw(MAL, "bpm.insert","NYI");
> +	return msg;
>  }
>  
>  str
>  BPMdelete(int *ret, int *bid, int *del)
>  {
> -	BAT *b;
> +	int i;
> +	Partition p;
> +	str msg= MAL_SUCCEED;
> +
>  	BPMopen();
> -	@:getBATdescriptor(bid,b,"bpm.delete")@
> -	/* delete some information from the partitions */
> -	(void) b;
> -	(void) del;
> +	p= getAlias(*bid);
> +	if( p== 0)
> +		throw(MAL,"bpm.insert","Can not alias BAT");
> +	/* distributed the content */
> +	for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
> +		msg=BKCdelete_bat(ret,&bpmcat[i].bid,del);
> +	}
>  	*ret= 0;
> -	throw(MAL, "bpm.delete","NYI");
> +	return msg;
>  }
>  
>  str
>  BPMreplace(int *ret, int *bid, int *ins, int *del)
>  {
> -	BAT *b;
> +	int i;
> +	Partition p;
> +	str msg= MAL_SUCCEED;
> +
>  	BPMopen();
> -	@:getBATdescriptor(bid,b,"bpm.insert")@
> -	/* replace values */
> -	(void) b;
> -	(void) ins;
> -	(void) del;
> +	p= getAlias(*bid);
> +	if( p== 0)
> +		throw(MAL,"bpm.insert","Can not alias BAT");
> +	/* distributed the content */
> +	for( i=p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
> +		msg=BKCdelete_bat(ret,&bpmcat[i].bid,del);
It should be
msg=BKCdelete_bat(ret,&bpmcat[i].pid,del);

Right?
> +		if( msg == MAL_SUCCEED)
> +			msg=BKCinsert_bat(ret,&bpmcat[i].bid,ins);
It should be :
		msg=BKCinsert_bat(ret,&bpmcat[i].pid,ins);
Rigth?
> +	}
>  	*ret= 0;
> -	throw(MAL, "bpm.replace","NYI");
> +	return msg;
>  }
>  
>  str
>  BPMgetNames(int *ret)
>  {
> +    BAT *b;
> +    int i;
> +
>  	BPMopen();
> -	*ret =0;
> -	throw(MAL, "bpm.getNames","NYI");
> +    b = BATnew(TYPE_int, TYPE_str, BBPsize);
> +    if (b == 0)
> +        throw(MAL, "catalog.bbpNames", "failed to create BAT");
> +
> +	for(i=1; i<bpmcattop; i++)
> +		BUNins(b, &i, bpmcat[i].name,TRUE);
> +	BBPkeepref(*ret = b->batCacheid);
> +    if (!(b->batDirty&2)) 
> +		b = BATsetaccess(b, BAT_READ);
> +	return MAL_SUCCEED;
>  }
>  
>  str
>  BPMdiscard(int *ret, int *bid)
>  {
> +	Partition p;
> +	int i;
> +	signed char r;
> +	str msg= MAL_SUCCEED;
> +
>  	BPMopen();
> -	(void) bid;
> +	p= getAlias(*bid);
> +	if( p== 0)
> +		throw(MAL,"bpm.discard","Can not alias BAT");
> +	/* discard the content */
> +	for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
> +		msg= BKCdestroy(&r, &bpmcat[i].bid);
It should be:
 > +		msg= BKCdestroy(&ret, &bpmcat[i].bid);
Right?

If I am right I will fix and commit it. These differences created 
compilation errors in windows.

Regards,
Romulo




More information about the developers-list mailing list