[Monetdb-developers] [Monetdb-checkins] MonetDB5/src/modules/mal remote.mx, , 1.48, 1.49

Romulo Goncalves R.A.Goncalves at cwi.nl
Sun Aug 24 03:04:23 CEST 2008


Fabian wrote:
> Update of /cvsroot/monetdb/MonetDB5/src/modules/mal
> In directory sc8-pr-cvs16.sourceforge.net:/tmp/cvs-serv27157
> 
> Modified Files:
> 	remote.mx 
> Log Message:
> Redo remote.mx to no longer have externally visible functions that are only for internal use.  Also minimise the number of messages a bit.  remote now assumes a strict typed scenario, meaning remote.get will require the variable to assign to to be typed.
But how bpm will compile with these changes? I am calling these 
functions there...

Romulo
> 
> U remote.mx
> Index: remote.mx
> ===================================================================
> RCS file: /cvsroot/monetdb/MonetDB5/src/modules/mal/remote.mx,v
> retrieving revision 1.48
> retrieving revision 1.49
> diff -u -d -r1.48 -r1.49
> --- remote.mx	13 Aug 2008 13:05:36 -0000	1.48
> +++ remote.mx	20 Aug 2008 11:10:59 -0000	1.49
> @@ -93,18 +93,10 @@
>  address RMTdestroy
>  comment "Destroys a previously created connection.";
>  
> -pattern describe(ident:str):void
> -address RMTdescribe
> -comment "Prints a description of the given identifier";
> -
>  pattern get(conn:str, ident:rmtobj):any
>  address RMTget
>  comment "Retrieves a copy of a remote object.";
>  
> -pattern getnextid():void
> -address RMTgetnextid
> -comment "Prints the next unique local identifier";
> -
>  pattern put(conn:str, object:any):rmtobj
>  address RMTput
>  comment "Copy an object to a remote site and returns its identifier.";
> @@ -118,10 +110,6 @@
>  comment "Remotely executes function mod.func using the argument list
>  of remote objects and returns the handle to its result";
>  
> - at -
> -@{
> -TODO: this is not thread safe yet and we should check stability
> -wrt broken communication lines.
>  @h
>  
>  #ifndef _REMOTE_DEF
> @@ -146,21 +134,22 @@
>  #include "remote.h"	/* for the implementation of the functions */
>  
>  @-
> -
> +TODO: thread safe or not?  Technically, these methods need to be
> +serialised per connection, hence a scheduler that interleaves e.g.
> +multiple get calls, simply violates this constraint.  If parallelism to
> +the same site is desired, a user could create a second connection.
>  @- Implementation
>  @h
>  typedef struct _connection {
>  	char               name[16];  /* name of the connection,
> -                                     15 chars should be enough for everyone */
> +	                                 15 chars should be enough for everyone */
>  	Mapi               mconn;     /* the Mapi handle for the connection */
> +	size_t             nextid;    /* id counter */
>  	struct _connection *next;     /* the next connection in the list */
>  } *connection;
>  
>  remote_export str RMTprelude(int *ret);
>  remote_export str RMTepilogue(int *ret);
> -remote_export str RMTinternalput(str *ret, Mapi mconn, int type, ptr value);
> -remote_export str RMTinternalget(Mapi mconn, str ident, ValPtr v);
> -remote_export str RMTfindconn(connection *ret, str conn);
>  
>  @-
>  
> @@ -171,15 +160,14 @@
>   * Helper function to return a connection matching a given string, or an
>   * error if it does not exist.  Before the connection is returned, it is
>   * made sure that it is usable.  If the connection cannot be made
> - * useable, an error is returned.
> + * useable, an error is returned.  Since this function is internal, it
> + * doesn't check the argument conn, as it should have been checked
> + * already.
>   */
> -str 
> +static INLINE str
>  RMTfindconn(connection *ret, str conn) {
>  	connection c = connections;
>  
> -	/* conn should be passed as valid variable here */
> -
> -
>  	/* just make sure the return isn't garbage */
>  	*ret = NULL;
>  
> @@ -202,11 +190,28 @@
>  }
>  
>  /**
> + * Little helper function that returns a GDKmalloced string containing a
> + * valid identifier that is supposed to be unique in the connection's
> + * remote context.  Since this function is internal, it doesn't check
> + * the argument conn, as it should have been checked already.
> + */
> +static INLINE str /*synchronized*/
> +RMTgetNextId(str *ret, connection conn) {
> +	char buf[128];
> +	snprintf(buf, 128, "remote_%s_id_" SZFMT, conn->name, conn->nextid++);
> +	*ret = GDKstrdup(buf);
> +	return(MAL_SUCCEED);
> +}
> +
> +/**
>   * Helper function to execute a query over the given connection,
>   * returning the result handle.  If communication fails in one way or
> - * another, an error is returned.
> + * another, an error is returned.  Since this function is internal, it
> + * doesn't check the input arguments func, conn and query, as they
> + * should have been checked already.
>   */
> -static INLINE str RMTquery(MapiHdl *ret, str func, Mapi conn, str query) {
> +static INLINE str
> +RMTquery(MapiHdl *ret, str func, Mapi conn, str query) {
>  	MapiHdl mhdl;
>  
>  	*ret = NULL;
> @@ -225,7 +230,7 @@
>  			throw(IO, func, "an error occurred on connection: %s",
>  					mapi_error_str(conn));
>  		} else {
> -			throw(IO, func, "remote function invocation didn't return a result");
> +			throw(MAL, func, "remote function invocation didn't return a result");
>  		}
>  	}
>  
> @@ -233,143 +238,83 @@
>  	return(MAL_SUCCEED);
>  }
>  
> -/**
> - * Helper function to place the given object on the remote host via the
> - * given connection.  An id will be fetched from the remote host, and
> - * set in ret if no problems occur, on which an error is returned.  The
> - * id can be used to reference the object put on the remote host.
> - */
> -str 
> -RMTinternalput(str *ret, Mapi mconn, int type, ptr value) {
> -	str tmp;
> -	str ident;
> -	MapiHdl mhdl = NULL;
> -
> -	/* get a free identifier on the remote host */
> -	rethrow("remote.put", tmp, RMTquery(&mhdl, "remote.put", mconn, "remote.getnextid();"));
> -	mapi_fetch_row(mhdl); /* should succeed */
> -	tmp = mapi_fetch_field(mhdl, 0); /* should be there */
> -	if (tmp == NULL) {
> -		mapi_close_handle(mhdl);
> -		throw(MAL, "remote.put", "missing first column in tuple");
> -	}
> -	/* allocate on the stack as not to leak when we error lateron */
> -	ident = alloca(sizeof(char) * (strlen(tmp) + 1));
> -	memcpy(ident, tmp, strlen(tmp) + 1);
> -	mapi_close_handle(mhdl);
> -
> -	/* depending on the input object generate actions to store the
> -	 * object remotely*/
> -	if (type == TYPE_any || isAnyExpression(type)) {
> -		throw(MAL, "remote.put", "cannot deal with '%s' type",
> -				getTypeName(type));
> -	} else if (isaBatType(type)) {
> -		BATiter bi;
> -		/* naive approach using bat.new() and bat.insert() calls */
> -		char head[10], tail[10];
> -		char qbuf[512]; /* FIXME: this should be dynamic */
> -		int bid;
> -		BAT *b;
> -		BUN p, q;
> -		str headv, tailv;
> -
> -		if (getHeadIndex(type) > 0) {
> -			sprintf(head, "any%c%d", TMPMARKER, getHeadIndex(type));
> -		} else if (getHeadType(type) == TYPE_any) {
> -			sprintf(head, "any");
> -		} else {
> -			sprintf(head, "%s", ATOMname(getHeadType(type)));
> -		}
> -		if (getTailIndex(type) > 0) {
> -			sprintf(tail, "any%c%d", TMPMARKER, getTailIndex(type));
> -		} else if (getTailType(type) == TYPE_any) {
> -			sprintf(tail, "any");
> -		} else {
> -			sprintf(tail, "%s", ATOMname(getTailType(type)));
> -		}
> +str RMTprelude(int *ret) {
> +	(void)ret;
>  
> -		bid = *(int *)value;
> -		if ((b = BATdescriptor(bid)) == NULL)
> -			throw(MAL, "remote.put", "cannot access BAT descriptor");
> +	return(MAL_SUCCEED);
> +}
>  
> -		qbuf[511] = '\0';
> -		snprintf(qbuf, 511, "%s := bat.new(:%s, :%s, " SZFMT ");",
> -				ident, head, tail, BATcount(b));
> +str RMTepilogue(int *ret) {
> +	connection c = connections, t;
>  
> -		rethrow("remote.put", tmp, RMTquery(&mhdl, "remote.put", mconn, qbuf));
> -		mapi_close_handle(mhdl);
> +	(void)ret;
>  
> -		headv = tailv = NULL;
> -		bi = bat_iterator(b);
> -		BATloop(b, p, q) {
> -			ATOMformat(getHeadType(type), BUNhead(bi, p), &headv);
> -			ATOMformat(getTailType(type), BUNtail(bi, p), &tailv);
> -			snprintf(qbuf, 511, "bat.insert(%s, %s:%s, %s:%s);",
> -					ident, headv, head, tailv, tail);
> -			rethrow("remote.put", tmp, RMTquery(&mhdl, "remote.put", mconn, qbuf));
> -			/* we leak headv and tailv here if an exception is thrown */
> -			mapi_close_handle(mhdl);
> -		}
> -		GDKfree(headv);
> -		GDKfree(tailv);
> -		BBPunfix(b->batCacheid);
> -	} else {
> -		str val = NULL;
> -		char qbuf[512]; /* FIXME: this should be dynamic */
> -		if (ATOMvarsized(type)) {
> -			ATOMformat(type, *(str *)value, &val);
> -		} else {
> -			ATOMformat(type, value, &val);
> -		}
> -		snprintf(qbuf, 511, "%s := %s:%s;\n", ident, val, ATOMname(type));
> -		qbuf[511] = '\0';
> -		GDKfree(val);
> -		rethrow("remote.put", tmp, RMTquery(&mhdl, "remote.put", mconn, qbuf));
> -		mapi_close_handle(mhdl);
> +	/* free connections list */
> +	while (c != NULL) {
> +		t = c;
> +		c = c->next;
> +		mapi_destroy(t->mconn);
> +		GDKfree(t);
>  	}
> +	/* not sure, but better be safe than sorry */
> +	connections = NULL;
>  
> -	*ret = GDKstrdup(ident);
>  	return(MAL_SUCCEED);
>  }
>  
> + at h
> +remote_export str RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
> + at c
>  /**
> - * Helper function to return a remote object via the given connection as
> - * a local object.
> + * get fetches the object referenced by ident over connection conn.
>   */
> -str 
> -RMTinternalget(Mapi mconn, str ident, ValPtr v) {
> -	str type, tmp;
> -	int vtype;
> +str RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
> +	str conn, ident, tmp;
> +	connection c;
>  	char qbuf[512];
>  	MapiHdl mhdl;
> +	int rtype;
> +	ValPtr v;
>  
> -	snprintf(qbuf, 511, "remote.describe(\"%s\");", ident);
> -	rethrow("remote.get", tmp, RMTquery(&mhdl, "remote.get", mconn, qbuf));
> +	(void)cntxt;
> +	(void)mb;
>  
> -	mapi_fetch_row(mhdl); /* should succeed */
> -	type = mapi_fetch_field(mhdl, 0); /* should be there */
> -	if (type == NULL) {
> -		mapi_close_handle(mhdl);
> -		throw(MAL, "remote.get", "missing first column in tuple");
> +	conn = (str)getArgValue(stk, pci, 1);
> +	if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
> +		throw(ILLARG, "remote.get", "connection name is NULL or nil");
> +	ident = (str)getArgValue(stk, pci, 2);
> +	if (isIdentifier(ident) < 0)
> +		throw(ILLARG, "remote.get", "identifier expected, got '%s'", ident);
> +
> +	/* lookup conn, set mconn if valid */
> +	rethrow("remote.get", tmp, RMTfindconn(&c, conn));
> +
> +	rtype = getArgType(mb, pci, 0);
> +	v = &stk->stk[getArg(pci, 0)];
> +
> +	if (rtype == TYPE_any || isAnyExpression(rtype)) {
> +		throw(MAL, "remote.get", "cannot deal with any (%s) type",
> +				getTypeName(rtype));
>  	}
>  
> -	vtype = getTypeIndex(type, (int) strlen(type), -1);
> -	if (vtype == TYPE_bat) {
> +	if (isaBatType(rtype)) {
>  		int h, t, s;
>  		ptr l, r;
>  		lng len;
>  		str val, var;
>  		BAT *b;
>  
> -		/* no checks on the existence of the values */
> -		val = mapi_fetch_field(mhdl, 1);
> -		h = getTypeIndex(val, (int) strlen(val), -1);
> -		val = mapi_fetch_field(mhdl, 2);
> -		t = getTypeIndex(val, (int) strlen(val), -1);
> -		vtype = newBatType(h, t);
> -		val = mapi_fetch_field(mhdl, 3);
> +		snprintf(qbuf, 511, "user.remote_internal_servegetbat(\"%s\", %s);",
> +				getTypeName(rtype), ident);
> +		rethrow("remote.get", tmp,
> +				RMTquery(&mhdl, "remote.get", c->mconn, qbuf));
> +		mapi_fetch_row(mhdl); /* should succeed */
> +		val = mapi_fetch_field(mhdl, 0);
> +		if (val == NULL) {
> +			mapi_close_handle(mhdl);
> +			throw(MAL, "remote.get", "missing first column in tuple");
> +		}
>  		len = atol(val);
> -		mapi_close_handle(mhdl);
>  
>  #if SIZEOF_SIZE_T == 4
>  		if ((lng)(len & 0x80000000FFFFFFFF) != len)
> @@ -378,15 +323,18 @@
>  #endif
>  		/* FIXME: len < 0 ???? */
>  
> -		snprintf(qbuf, 511, "io.print(%s);", ident);
> -		rethrow("remote.get", tmp, RMTquery(&mhdl, "remote.get", mconn, qbuf));
> -
>  		assert(0 <= len && len <= (lng)GDK_oid_max);
> +		h = getHeadType(rtype);
> +		t = getTailType(rtype);
>  		b = BATnew(h, t, (size_t)len);
>  
>  		while (len-- > 0) {
> -			mapi_fetch_row(mhdl);
> -			val = mapi_fetch_field(mhdl, 0); /* should be there */
> +			if (mapi_fetch_row(mhdl) == 0) {
> +				mapi_close_handle(mhdl);
> +				throw(MAL, "remote.get",
> +						"expected %d more rows in result", len + 1);
> +			}
> +			val = mapi_fetch_field(mhdl, 0); /* should both be there */
>  			var = mapi_fetch_field(mhdl, 1);
>  			if (ATOMvarsized(h)) {
>  				l = (ptr)(val == NULL ? str_nil : val);
> @@ -413,28 +361,26 @@
>  			if (!ATOMvarsized(t)) GDKfree(r);
>  		}
>  
> -		/* adjust the return type */
> -		v->vtype = vtype;
>  		v->val.ival = b->batCacheid;
> -	} else if (vtype == TYPE_any) {
> -		mapi_close_handle(mhdl);
> -		throw(MAL, "remote.get", "can't handle variables of type 'any'");
>  	} else {
> -		/* we risk getting something we don't understand here */
>  		ptr p = NULL;
>  		str val;
>  		int len = 0;
>  
> -		/* no checks on the existence of the value */
> -		val = mapi_fetch_field(mhdl, 1);
> +		snprintf(qbuf, 511, "user.remote_internal_serveget(\"%s\", %s);",
> +				getTypeName(rtype), ident);
> +		rethrow("remote.get", tmp,
> +				RMTquery(&mhdl, "remote.get", c->mconn, qbuf));
> +		mapi_fetch_row(mhdl); /* should succeed */
> +		val = mapi_fetch_field(mhdl, 0);
>  
> -		if (ATOMvarsized(vtype)) {
> -			VALset(v, vtype, GDKstrdup(val));
> +		if (ATOMvarsized(rtype)) {
> +			VALset(v, rtype, GDKstrdup(val == NULL ? str_nil : val));
>  		} else {
> -			ATOMfromstr(vtype, &p, &len, val);
> +			ATOMfromstr(rtype, &p, &len, val == NULL ? "nil" : val);
>  			if (p != NULL) {
> -				VALset(v, vtype, p);
> -				if (ATOMextern(vtype) == 0)
> +				VALset(v, rtype, p);
> +				if (ATOMextern(rtype) == 0)
>  					GDKfree(p);
>  			} else {
>  				char tval[512];
> @@ -451,194 +397,116 @@
>  	return(MAL_SUCCEED);
>  }
>  
> -str RMTprelude(int *ret) {
> -	(void)ret;
> -
> -	return(MAL_SUCCEED);
> -}
> -
> -str RMTepilogue(int *ret) {
> -	connection c = connections, t;
> -
> -	(void)ret;
> -
> -	/* free connections list */
> -	while (c != NULL) {
> -		t = c;
> -		c = c->next;
> -		mapi_destroy(t->mconn);
> -		GDKfree(t);
> -	}
> -	/* not sure, but better be safe than sorry */
> -	connections = NULL;
> -
> -	return(MAL_SUCCEED);
> -}
> -
>  @h
> -remote_export str RMTdescribe(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
> +remote_export str RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
>  @c
>  /**
> - * Describe() prints a descriptive string for the given identifier ident.
> - * This method is used by get() to retrieve an object.  Describe() prints
> - * a single tuple with information on the object.  The first column in
> - * the tuple is the MAL type of the object.  Depending on this, the
> - * other columns are filled in.  For scalar data types, the second
> - * column represents the value of the scalar as a string, e.g.
> - * [ "int", 1 ]
> - * More complex types that need to be fetched afterwards just get extra
> - * information where available, e.g. to describe a BAT with oid head and
> - * str tail and one BUN:
> - * [ "bat", "oid", "str", 1 ]
> + * stores the given object on the remote host.  The identifier of the
> + * object on the remote host is returned for later use.
>   */
> -str RMTdescribe(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
> -	int i;
> -	str name;
> -	VarPtr n;
> +str RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
> +	str conn, tmp;
> +	str ident;
> +	connection c;
>  	ValPtr v;
> -	char ret[256];
> +	int type;
> +	ptr value;
> +	MapiHdl mhdl = NULL;
>  
> -	(void) cntxt;
> -	name = (str)getArgValue(stk, pci, 1);
> -	i = findVariable(mb, name);
> -	if (i < 0)
> -		throw(MAL, "remote.describe", "no such object '%s'", name);
> +	(void)cntxt;
>  
> -	n = getVar(mb, i);
> -	v = stk->stk + i;
> -	if (n->type == TYPE_any || isAnyExpression(n->type)) {
> -		throw(MAL, "remote.describe", "cannot deal with '%s' type",
> -				getTypeName(n->type));
> -	} else if (isaBatType(n->type)) {
> +	conn = (str)getArgValue(stk, pci, 1);
> +	if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
> +		throw(ILLARG, "remote.put", "connection name is NULL or nil");
> +
> +	/* lookup conn */
> +	rethrow("remote.put", tmp, RMTfindconn(&c, conn));
> +
> +	/* put the thing */
> +	type = getArgType(mb, pci, 2);
> +	value = getArgReference(stk, pci, 2);
> +
> +	/* get a free identifier for the remote host */
> +	RMTgetNextId(&tmp, c);
> +	/* allocate on the stack as not to leak when we error lateron */
> +	ident = alloca(sizeof(char) * (strlen(tmp) + 1));
> +	memcpy(ident, tmp, strlen(tmp) + 1);
> +	GDKfree(tmp); /* FIXME, this is inefficient... */
> +
> +	/* depending on the input object generate actions to store the
> +	 * object remotely*/
> +	if (type == TYPE_any || isAnyExpression(type)) {
> +		throw(MAL, "remote.put", "cannot deal with '%s' type",
> +				getTypeName(type));
> +	} else if (isaBatType(type)) {
> +		BATiter bi;
> +		/* naive approach using bat.new() and bat.insert() calls */
>  		char head[10], tail[10];
> +		char qbuf[512]; /* FIXME: this should be dynamic */
> +		int bid;
>  		BAT *b;
> +		BUN p, q;
> +		str headv, tailv;
>  
> -		if (getHeadIndex(n->type) > 0) {
> -			sprintf(head, "any%c%d", TMPMARKER, getHeadIndex(n->type));
> -		} else if (getHeadType(n->type) == TYPE_any) {
> +		if (getHeadIndex(type) > 0) {
> +			sprintf(head, "any%c%d", TMPMARKER, getHeadIndex(type));
> +		} else if (getHeadType(type) == TYPE_any) {
>  			sprintf(head, "any");
>  		} else {
> -			sprintf(head, "%s", ATOMname(getHeadType(n->type)));
> +			sprintf(head, "%s", ATOMname(getHeadType(type)));
>  		}
> -		if (getTailIndex(n->type) > 0) {
> -			sprintf(tail, "any%c%d", TMPMARKER, getTailIndex(n->type));
> -		} else if (getTailType(n->type) == TYPE_any) {
> +		if (getTailIndex(type) > 0) {
> +			sprintf(tail, "any%c%d", TMPMARKER, getTailIndex(type));
> +		} else if (getTailType(type) == TYPE_any) {
>  			sprintf(tail, "any");
>  		} else {
> -			sprintf(tail, "%s", ATOMname(getTailType(n->type)));
> +			sprintf(tail, "%s", ATOMname(getTailType(type)));
>  		}
>  
> -		if ((b = BATdescriptor(v->val.ival)) == NULL)
> -			throw(MAL, "remote.describe",
> -					"invalid BAT descriptor for '%s'", name);
> -		snprintf(ret, 255, "[ \"bat\",\t\"%s\",\t\"%s\",\t" SZFMT "\t]",
> -				head, tail, BATcount(b));
> +		bid = *(int *)value;
> +		if ((b = BATdescriptor(bid)) == NULL)
> +			throw(MAL, "remote.put", "cannot access BAT descriptor");
> +
> +		qbuf[511] = '\0';
> +		snprintf(qbuf, 511, "%s := bat.new(:%s, :%s, " SZFMT ");",
> +				ident, head, tail, BATcount(b));
> +
> +		rethrow("remote.put", tmp, RMTquery(&mhdl, "remote.put", c->mconn, qbuf));
> +		mapi_close_handle(mhdl);
> +
> +		headv = tailv = NULL;
> +		bi = bat_iterator(b);
> +		BATloop(b, p, q) {
> +			ATOMformat(getHeadType(type), BUNhead(bi, p), &headv);
> +			ATOMformat(getTailType(type), BUNtail(bi, p), &tailv);
> +			snprintf(qbuf, 511, "bat.insert(%s, %s:%s, %s:%s);",
> +					ident, headv, head, tailv, tail);
> +			rethrow("remote.put", tmp, RMTquery(&mhdl, "remote.put", c->mconn, qbuf));
> +			/* we leak headv and tailv here if an exception is thrown */
> +			mapi_close_handle(mhdl);
> +		}
> +		GDKfree(headv);
> +		GDKfree(tailv);
>  		BBPunfix(b->batCacheid);
>  	} else {
>  		str val = NULL;
> -		if (ATOMvarsized(n->type)) {
> -			ATOMformat(n->type, *(str *)v, &val);
> +		char qbuf[512]; /* FIXME: this should be dynamic */
> +		if (ATOMvarsized(type)) {
> +			ATOMformat(type, *(str *)value, &val);
>  		} else {
> -			ATOMformat(n->type, v, &val);
> +			ATOMformat(type, value, &val);
>  		}
> -		snprintf(ret, 255, "[ \"%s\",\t%s\t]", ATOMname(n->type), val);
> +		snprintf(qbuf, 511, "%s := %s:%s;\n", ident, val, ATOMname(type));
> +		qbuf[511] = '\0';
>  		GDKfree(val);
> +		rethrow("remote.put", tmp, RMTquery(&mhdl, "remote.put", c->mconn, qbuf));
> +		mapi_close_handle(mhdl);
>  	}
> -	ret[255] = '\0';
> -
> -	stream_printf(cntxt->fdout, "%s\n", ret);
> -
> -	return(MAL_SUCCEED);
> -}
> -@}
> -
> - at h
> -remote_export str RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
> - at c
> -/**
> - * get fetches the object referenced by ident over connection conn.
> - */
> -str RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
> -	str conn, ident, tmp;
> -	connection c;
> -
> -	(void)cntxt;
> -	(void)mb;
> -
> -	conn = (str)getArgValue(stk, pci, 1);
> -	if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
> -		throw(ILLARG, "remote.get", "connection name is NULL or nil");
> -	ident = (str)getArgValue(stk, pci, 2);
> -	if (isIdentifier(ident) < 0)
> -		throw(ILLARG, "remote.get", "identifier expected, got '%s'", ident);
> -
> -	/* lookup conn, set mconn if valid */
> -	rethrow("remote.get", tmp, RMTfindconn(&c, conn));
> -
> -	/* perform the get */
> -	rethrow("remote.get", tmp, RMTinternalget(c->mconn,
> -				ident, &stk->stk[getArg(pci, 0)]));
> -
> -	return(MAL_SUCCEED);
> -}
> -
> - at h
> -remote_export str RMTgetnextid(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
> - at c
> -static unsigned int _remote_object_counter = 0;
> -/**
> - * getnextid prints the next unique local identifier as a tuple with one
> - * column, e.g.:
> - * [ "remote_put_object_1" ]
> - * This method is used by put() to store an object.  The identifiers
> - * generated by getnextid are based on a simple and naive strategy of
> - * having a prefix and a counter that result in a hopefully unique
> - * identifier.  The current prefix is "remote_put_object_" which should
> - * reduce possible conflicts to a minimum.
> - */
> -str RMTgetnextid(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
> -{
> -	int *ret = (int *)getArgReference(stk,pci,0);
> -	(void)mb;
> -
> -	stream_printf(cntxt->fdout, "[ \"remote_put_object_%d\"\t]\n",
> -			_remote_object_counter++);
> -
> -	*ret = 0;
> -	return(MAL_SUCCEED);
> -}
> -
> - at h
> -remote_export str RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
> - at c
> -/**
> - * stores the given object on the remote host.  The identifier of the
> - * object on the remote host is returned for later use.
> - */
> -str RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
> -	str conn, tmp;
> -	str ident;
> -	connection c;
> -	ValPtr v;
> -
> -	(void) cntxt;
> -	conn = (str)getArgValue(stk, pci, 1);
> -	if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
> -		throw(ILLARG, "remote.put", "connection name is NULL or nil");
> -
> -	/* lookup conn */
> -	rethrow("remote.put", tmp, RMTfindconn(&c, conn));
> -
> -	/* put the thing */
> -	rethrow("remote.put", tmp, RMTinternalput(&ident,
> -				c->mconn,
> -				getArgType(mb, pci, 2),
> -				getArgReference(stk, pci, 2)));
>  
>  	/* return the identifier */
>  	v = &stk->stk[getArg(pci, 0)];
>  	v->vtype = TYPE_str;
> -	v->val.sval = ident;
> +	v->val.sval = GDKstrdup(ident);
>  	return(MAL_SUCCEED);
>  }
>  
> @@ -678,22 +546,15 @@
>  	rethrow("remote.exec", tmp, RMTfindconn(&c, conn));
>  
>  	/* get a free identifier on the remote host */
> -	mhdl = NULL;
> -	rethrow("remote.exec", tmp, RMTquery(&mhdl, "remote.exec", c->mconn, "remote.getnextid();"));
> -	mapi_fetch_row(mhdl); /* should succeed */
> -	tmp = mapi_fetch_field(mhdl, 0); /* should be there */
> -	if (tmp == NULL) {
> -		mapi_close_handle(mhdl);
> -		throw(MAL, "remote.exec", "missing first column in tuple");
> -	}
> -	i = (int) strlen(tmp) + 1;
> +	RMTgetNextId(&tmp, c);
> +	i = (int)strlen(tmp) + 1;
>  	ident = alloca(sizeof(char) * i);
>  	memcpy(ident, tmp, i);
> +	GDKfree(tmp);
>  
>  	/* build the function invocation string in qbuf */
>  	len = 0;
>  	len += snprintf(&qbuf[len], 511 - len, "%s := %s.%s(", ident, mod, func);
> -	mapi_close_handle(mhdl);
>  
>  	/* handle the arguments to the function */
>  	assert(pci->argc >= 3); /* ret, conn, func, ... */
> @@ -730,6 +591,36 @@
>  		str *user,
>  		str *passwd);
>  @c
> +#define serveget "\
> +    function user.remote_internal_servegetbat(type:str, o:bat[:any_1,:any_2]):void;\n\
> +        t := inspect.getType(o);\n\
> +        barrier ifpart := type == t;\n\
> +			len := aggr.count(o);\n\
> +			io.print(len);\n\
> +            io.print(o);\n\
> +        exit ifpart;\n\
> +		barrier ifpart := calc.isnil(type);\n\
> +			io.printf(\"!MALException:user.serveget:nil type doesn't match anything\n\");\n\
> +		exit ifpart;\n\
> +		barrier ifpart := type != t;\n\
> +			io.printf(\"!MALException:user.serveget:object type (%s) \", t);\n\
> +			io.printf(\"does not match required type (%s)\n\", type);\n\
> +		exit ifpart;\n\
> +    end user.remote_internal_servegetbat;\n\
> +    function user.remote_internal_serveget(type:str, o:any):void;\n\
> +        t := inspect.getType(o);\n\
> +        barrier ifpart := type == t;\n\
> +            io.print(o);\n\
> +        exit ifpart;\n\
> +		barrier ifpart := calc.isnil(type);\n\
> +			io.printf(\"!MALException:user.serveget:nil type doesn't match anything\n\");\n\
> +		exit ifpart;\n\
> +		barrier ifpart := type != t;\n\
> +			io.printf(\"!MALException:user.serveget:object type (%s) \", t);\n\
> +			io.printf(\"does not match required type (%s)\n\", type);\n\
> +		exit ifpart;\n\
> +    end user.remote_internal_serveget;\n\
> +"
>  str RMTcreate(
>  		int *ret,
>  		str *conn,
> @@ -741,6 +632,8 @@
>  {
>  	connection c;
>  	int s;
> +	MapiHdl mhdl;
> +	str tmp;
>  
>  	if (conn == NULL || *conn == NULL || strcmp(*conn, (str)str_nil) == 0)
>  		throw(ILLARG, "remote.create", "connection name is NULL or nil");
> @@ -780,8 +673,20 @@
>  	memcpy(c->name, *conn, s + 1 /* \0 */);
>  	c->mconn = mapi_mapi(*host, *port, *user, *passwd, "mal",
>  			(strcmp(*dbname, (str)str_nil) == 0 ? NULL : *dbname));
> +	c->nextid = 0;
>  	c->next = NULL;
>  
> +	if (mapi_reconnect(c->mconn) != MOK)
> +		throw(IO, "remote.create", "an error occurred during "
> +				"connect of connection '%s': %s",
> +				conn, mapi_error_str(c->mconn));
> +	/* TODO: throw away connection? */
> +
> +	/* initialise remote helper function */
> +	rethrow("remote.create", tmp,
> +			RMTquery(&mhdl, "remote.create", c->mconn, serveget));
> +	mapi_close_handle(mhdl);
> +
>  	/* just make sure the return isn't garbage */
>  	*ret = 0;
>  
> 
> 
> -------------------------------------------------------------------------
> This SF.Net email is sponsored by the Moblin Your Move Developer's challenge
> Build the coolest Linux based applications with Moblin SDK & win great prizes
> Grand prize is a trip for two to an Open Source event anywhere in the world
> http://moblin-contest.org/redirect.php?banner_id=100&url=/
> _______________________________________________
> Monetdb-checkins mailing list
> Monetdb-checkins at lists.sourceforge.net
> https://lists.sourceforge.net/lists/listinfo/monetdb-checkins





More information about the developers-list mailing list