Re: [Monetdb-developers] [Monetdb-checkins] MonetDB5/src/modules/mal bpm.mx, 1.23, 1.24
Martin Kersten wrote:
Update of /cvsroot/monetdb/MonetDB5/src/modules/mal In directory sc8-pr-cvs7.sourceforge.net:/tmp/cvs-serv29523
Modified Files: bpm.mx Log Message: 70% of the code needed to handle partitioned BATs in a single server. The code is not tested yet (except for compilation errors), but should give a head start for those in-experienced in working with MonetDB source code.
Index: bpm.mx
RCS file: /cvsroot/monetdb/MonetDB5/src/modules/mal/bpm.mx,v retrieving revision 1.23 retrieving revision 1.24 diff -u -d -r1.23 -r1.24 --- bpm.mx 4 Feb 2007 22:55:32 -0000 1.23 +++ bpm.mx 6 Feb 2007 07:16:13 -0000 1.24 @@ -229,7 +229,7 @@ address BPMdestroy comment "Destroy the BAT partition box";
-command deposit(nme:str,b:bat[:oid,:any_2]) :void +command deposit(nme:str,b:bat[:oid,:any_2]) :bat[:oid,:any_2] address BPMdeposit comment "Create a new partitioned BAT by name";
@@ -238,36 +238,36 @@ If the alias BAT denotes an existing partition, it is further broken into pieces. @mal -command rangePartition(pb:bat[:oid,:any_2],
ll:oid, lh:oid, rl:any_2, rh:any_2):void
+command rangePartition(pb:bat[:any_1,:any_2], rl:any_2, rh:any_2):void address BPMrange comment "Create a range partition on a BAT"; -command rangePartition(pb:bat[:oid,:any_2], pv:bat[:oid,:any_2]):void +command rangePartition(pb:bat[:any_1,:any_2], pv:bat[:oid,:any_2]):void address BPMrangeVector comment "Create the partitions based on a range vector";
-command hashPartition(pb:bat[:oid,:any_2], slots:int):void +command hashPartition(pb:bat[:any_1,:any_2], slots:int):void address BPMhash comment "Create a hash partition on a BAT"; -command derivePartition(pb:bat[:oid,:any_2], src:bat[:oid,:any_2]):bat[:oid,:any_2] +command derivePartition(pb:bat[:any_1,:any_2],
- src:bat[:any_1,:any_2]):bat[:any_1,:any_2]
address BPMderived comment "Create a derived fragmentation over the head using src.";
-command take(pb:str, b:bat[:oid,:any_2]):bat[:oid,:any_2] +command take(pb:str, b:bat[:any_1,:any_2]):bat[:any_1,:any_2] address BPMtake comment "Retrieve the alias for a partitioned BAT"; -command take(alias:bat[:oid,:any_2],index:int) :bat[:oid,:any_2] +command take(alias:bat[:any_1,:any_2],index:int) :bat[:any_1,:any_2] address BPMtakePartition comment "Retrieve a single component of a partitioned BAT by index";
-command insert(pb:bat[:oid,:any_2],b:bat[:oid,:any_2]) :void +command insert(pb:bat[:any_1,:any_2],b:bat[:any_1,:any_2]) :void address BPMinsert comment "Insert elements into the BAT partitions"; -command delete(pb:bat[:oid,:any_2],b:bat[:oid,:any_2]) :void +command delete(pb:bat[:any_1,:any_2],b:bat[:any_1,:any_2]) :void address BPMdelete comment "Delete elements from the BAT partitions"; -command replace(pb:bat[:oid,:any_2],old:bat[:oid,:any_2],
nwe:bat[:oid,:any_2]) :void
+command replace(pb:bat[:any_1,:any_2],old:bat[:any_1,:any_2],
nwe:bat[:any_1,:any_2]) :void
address BPMreplace comment "Replace the content of the BAT partitions";
@@ -275,7 +275,7 @@ address BPMgetNames comment "Retrieve the names of all known partitioned BATs";
-command discard(alias:bat[:oid,:any_2]) :void +command discard(alias:bat[:any_1,:any_2]) :void address BPMdiscard comment "Release a partitioned BAT from the box";
@@ -285,39 +285,39 @@ Wherever possible skipping elements that don't qualify the bounds given for the head. @mal -command newIterator(grp:bat[:oid,:any_2]):bat[:oid,:any_2] +command newIterator(grp:bat[:any_1,:any_2]):bat[:any_1,:any_2] address BPMnewIterator comment "Create an iterator over the BAT partitions.";
-command newIterator(grp:bat[:oid,:any_2],first:any_2,last:any_2)
:bat[:oid,:any_2]
+command newIterator(grp:bat[:any_1,:any_2],first:any_2,last:any_2)
:bat[:any_1,:any_2]
address BPMnewIteratorRng comment "Create an iterator over the BAT partitions.";
-command newIterator(pb:bat[:oid,:any_2], first:oid,last:oid, +command newIterator(pb:bat[:any_1,:any_2], first:any_1,last:any_1, vlow:any_2, vhgh:any_2) :bat[:oid,:any_2] address BPMnewIteratorRng4 comment "Create an iterator over the BAT partitions.";
-command hasMoreElements(grp:bat[:oid,:any_2]) :bat[:oid,:any_2] +command hasMoreElements(grp:bat[:any_1,:any_2]) :bat[:any_1,:any_2] address BPMhasMoreElements comment "Localize the next partition for processing.";
-command hasMoreElements(pb:bat[:oid,:any_2],
low:any_2, hgh:any_2) :bat[:oid,:any_2]
+command hasMoreElements(pb:bat[:any_1,:any_2],
low:any_2, hgh:any_2) :bat[:any_1,:any_2]
address BPMhasMoreElementsRng2 comment "Localize the next partition for processing."; -command hasMoreElements(pb:bat[:oid,:any_2], first:oid,last:oid,
vlow:any_2, vhgh:any_2) :bat[:oid,:any_2]
+command hasMoreElements(pb:bat[:any_1,:any_2], first:any_1,last:any_1,
vlow:any_2, vhgh:any_2) :bat[:any_1,:any_2]
address BPMhasMoreElementsRng4 comment "Localize the next partition for processing.";
-command getDimension(b:bat[:oid,:any_2])(first:oid,last:oid, +command getDimension(b:bat[:any_1,:any_2])(first:any_1,last:any_1, vlow:any_2, vhgh:any_2) address BPMgetDimension comment "Obtain the partition boundary values.";
-command dump(alias:bat[:oid,:any_2]) +command dump(alias:bat[:any_1,:any_2]) address BPMdumpAlias comment "Give the details of the partition tree"; command dump() @@ -403,6 +403,7 @@ @c #include "mal_config.h" #include "bpm.h" +#include "bat5.h"
@- Every partition is the result of at most four operations: @@ -430,7 +431,6 @@ int nxt, prv; /* list of all partitions*/ } *Partition, PartitionRec;
-/* NOT USED YET static void BPMprintRecord(stream *f, Partition p){ stream_printf(f,"partition: %s alias %d bid %d index %d ", p->name, p->alias, p->bid, p->index); @@ -438,7 +438,6 @@ stream_printf(f,"hbucket %d tbucket %d ", p->hbucket,p->tbucket); stream_printf(f,"nxt %d prv %d\n", p->nxt,p->prv); } -*/ @- The number of partitioned BATs is considered low and a straight forward array with linear search seems @@ -470,7 +469,6 @@ } return 0; } -/* NOT USED YET static Partition getPartition(int bid, int idx){ return bpmcat+getPartitionIndex(bid,idx); } @@ -478,6 +476,7 @@ return getPartition(bid,0); }
+/* NOT USED YET static void delAlias(int bid){ int i; i= getPartitionIndex(bid,0); @@ -595,33 +594,121 @@
BBPkeepref(*ret= bn->batCacheid); BBPunfix(b->batCacheid);
- throw(MAL, "bpm.deposit","NYI");
- return MAL_SUCCEED;
}
+@- +Range partitioning simply runs through all partitions +and creates new ones, keeping the partitions in order. +The 'critical' part is to detect overlap of the range +and the bounds of the partition. +@c str BPMrange(int *ret, int *bid, ptr *low, ptr *hgh){ BAT *b;
- int i, tpe;
- int (*cmp) (ptr, ptr);
- ptr nilptr;
- int low_nil, hgh_nil;
- Partition p;
- BPMopen();
- @:getBATdescriptor(bid,b,"bpm.range")@
- /* determine the partitioning scheme */
- (void) b;
- (void) low;
- (void) hgh;
- p= getAlias(*bid);
- if( p==0){
throw(MAL, "bpm.range","Partition not known");
- }
- i= p->index;
- @:getBATdescriptor(&bpmcat[i].bid,b,"bpm.range")@
- if( b == 0)
throw(MAL,"bpm.range","Could not access BAT");
- /* get the comparison function */
- tpe= *bid >0? b->htype:b->ttype;
- cmp= BATatoms[tpe].atomCmp;
- nilptr = ATOMnilptr(tpe);
- low_nil = ((*cmp) (*low, nilptr) == 0);
- hgh_nil = ((*cmp) (*hgh, nilptr) == 0);
- BBPunfix(b->batCacheid); /* don't need it anymore */
- for( ; i ; i= bpmcat[i].nxt){
@:getBATdescriptor(&bpmcat[i].bid,b,"bpm.range")@
/* determine overlag by excluding outliers */
if( *bid > 0 ){
if( (*cmp) ((ptr) &bpmcat[i].thgh, *low) < 0 ||
(*cmp) ((ptr) &bpmcat[i].tlow, *hgh) > 0
){
/* this fragment need not be split */
} else {
/* break the fragment */
+#ifdef _DEBUG_BPM_
stream_printf(GDKout, "break fragment %d\n",i);
+#endif
}
} else {
/* use reversed bat */
if( (*cmp) ((ptr) &bpmcat[i].hhgh, *low) < 0 ||
(*cmp) ((ptr) &bpmcat[i].hlow, *hgh) > 0
){
/* this fragment need not be split */
} else {
+#ifdef _DEBUG_BPM_
stream_printf(GDKout, "break fragment %d\n",i);
+#endif
/* break the fragment */
}
}
BBPunfix(b->batCacheid);
- } *ret= 0;
- throw(MAL, "bpm.range","NYI");
- return MAL_SUCCEED;
} +@- +The vector approach could either use a single column +of ordered split points, or a double column with (low,hgh) +value pairs. (STILL TO BE DECIDED) +We should ensure that the range vector is sorted first. +@c str BPMrangeVector(int *ret, int *bid, int *pv) {
- BAT *b;
- BAT *b, *bpv;
- BUN p,q;
- /* ptr ph=0; */
- BPMopen(); @:getBATdescriptor(bid,b,"bpm.range")@
- /* determine the partitioning scheme */
- (void) b;
- (void) bid;
- (void) pv;
- *ret= 0;
- throw(MAL, "bpm.range","NYI");
- if( b == 0)
throw(MAL,"bpm.range","Cannot access BAT");
- @:getBATdescriptor(pv,bpv,"bpm.range")@
- if( bpv == 0){
BBPunfix(b->batCacheid);
throw(MAL,"bpm.range","Cannot access BAT");
- }
- /* Apply the range partition using pairs */
- BATloop(b,p,q){
ptr ph= BUNhead(b,p);
ptr pt= BUNtail(b,p);
BPMrange(ret, bid, &ph, &pt);
- }
- /* Apply the range partition using single column
- BATloop(b,p,q){
if( ph){
ph= BUNtail(b,p);
} else {
ptr pt= BUNtail(b,p);
BPMrange(ret, bid,&ph,&pt);
ph= BUNtail(b,p);
}
- }
*/
- BBPunfix(b->batCacheid);
- BBPunfix(bpv->batCacheid);
- return MAL_SUCCEED;
} str BPMhash(int *ret, int *bid, int *slots, int *prime) @@ -654,77 +741,140 @@ str BPMtake(int *ret, str *nme) {
- int i; BPMopen();
- (void) nme;
- *ret =0;
- throw(MAL, "bpm.take","NYI");
- i= getPartitionName(*nme);
- if( i== 0){
throw(MAL,"pbm.take","Partitioned BAT does not exist");
- }
- *ret = bpmcat[i].alias;
- return MAL_SUCCEED;
} str BPMtakePartition(int *ret, int *bid, int *index) {
- BAT *b;
- int i;
- BPMopen();
- (void) bid;
- (void) index;
- *ret =0;
- throw(MAL, "bpm.take","NYI");
- i= getPartitionIndex(*bid, *index);
- if( i== 0){
throw(MAL,"pbm.take","Partitioned BAT does not exist");
- }
- @:getBATdescriptor(bid,b,"bpm.take")@
- if( b == 0){
throw(MAL,"bpm.take","Cannot access BAT");
- }
- BBPkeepref(*ret =b->batCacheid);
- return MAL_SUCCEED;
} @- Updates +The update operations simply loop through the partitions +and apply the results. We can optimize this by inspection +of the range tables, but that is considered relevant in a distributed +case. +The updates are merely convenient operators, because ideally +the optimizer already filters out the partitions of interest. @c str BPMinsert(int *ret, int *bid, int *ins) {
- BAT *b;
- int i;
- str msg= MAL_SUCCEED;
- Partition p;
- BPMopen();
- @:getBATdescriptor(bid,b,"bpm.insert")@
- p= getAlias(*bid);
- if( p== 0)
/* distributed the content */throw(MAL,"bpm.insert","Can not alias BAT");
- (void) b;
- (void) ins;
- for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
msg=BKCinsert_bat(ret,&bpmcat[i].bid,ins);
- } *ret= 0;
- throw(MAL, "bpm.insert","NYI");
- return msg;
}
str BPMdelete(int *ret, int *bid, int *del) {
- BAT *b;
- int i;
- Partition p;
- str msg= MAL_SUCCEED;
- BPMopen();
- @:getBATdescriptor(bid,b,"bpm.delete")@
- /* delete some information from the partitions */
- (void) b;
- (void) del;
- p= getAlias(*bid);
- if( p== 0)
throw(MAL,"bpm.insert","Can not alias BAT");
- /* distributed the content */
- for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
msg=BKCdelete_bat(ret,&bpmcat[i].bid,del);
- } *ret= 0;
- throw(MAL, "bpm.delete","NYI");
- return msg;
}
str BPMreplace(int *ret, int *bid, int *ins, int *del) {
- BAT *b;
- int i;
- Partition p;
- str msg= MAL_SUCCEED;
- BPMopen();
- @:getBATdescriptor(bid,b,"bpm.insert")@
- /* replace values */
- (void) b;
- (void) ins;
- (void) del;
- p= getAlias(*bid);
- if( p== 0)
throw(MAL,"bpm.insert","Can not alias BAT");
- /* distributed the content */
- for( i=p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
msg=BKCdelete_bat(ret,&bpmcat[i].bid,del);
It should be msg=BKCdelete_bat(ret,&bpmcat[i].pid,del);
Right?
if( msg == MAL_SUCCEED)
msg=BKCinsert_bat(ret,&bpmcat[i].bid,ins);
It should be : msg=BKCinsert_bat(ret,&bpmcat[i].pid,ins); Rigth?
- } *ret= 0;
- throw(MAL, "bpm.replace","NYI");
- return msg;
}
str BPMgetNames(int *ret) {
- BAT *b;
- int i;
- BPMopen();
- *ret =0;
- throw(MAL, "bpm.getNames","NYI");
- b = BATnew(TYPE_int, TYPE_str, BBPsize);
- if (b == 0)
throw(MAL, "catalog.bbpNames", "failed to create BAT");
- for(i=1; i<bpmcattop; i++)
BUNins(b, &i, bpmcat[i].name,TRUE);
- BBPkeepref(*ret = b->batCacheid);
- if (!(b->batDirty&2))
b = BATsetaccess(b, BAT_READ);
- return MAL_SUCCEED;
}
str BPMdiscard(int *ret, int *bid) {
- Partition p;
- int i;
- signed char r;
- str msg= MAL_SUCCEED;
- BPMopen();
- (void) bid;
- p= getAlias(*bid);
- if( p== 0)
throw(MAL,"bpm.discard","Can not alias BAT");
- /* discard the content */
- for(i= p->bid ; i && msg== MAL_SUCCEED; i= bpmcat[i].nxt){
msg= BKCdestroy(&r, &bpmcat[i].bid);
It should be:
msg= BKCdestroy(&ret, &bpmcat[i].bid);
Right?
If I am right I will fix and commit it. These differences created compilation errors in windows.
Regards, Romulo
participants (1)
-
Romulo Goncalves