changeset 86126:5594cf709444

Let mapi_request_upload do the bstream handling
author Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com>
date Wed, 20 Jul 2022 11:13:25 +0200
parents c5976436a2d7
children 75a75d556a2c
files common/stream/mapi_stream.c common/stream/stream.h sql/backends/monet5/sql.c sql/backends/monet5/sql_bincopyfrom.c
diffstat 4 files changed, 19 insertions(+), 35 deletions(-) [+]
line wrap: on
line diff
--- a/common/stream/mapi_stream.c
+++ b/common/stream/mapi_stream.c
@@ -92,35 +92,36 @@ recv_upload_destroy(stream *s)
 
 
 stream*
-mapi_request_upload(const char *filename, bool binary, stream *from, stream *to)
+mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws)
 {
 	const char *msg = NULL;
 	stream *s = NULL;
 	struct mapi_recv_upload *state = NULL;
 	ssize_t nwritten;
 
-	assert(from->readonly);
-	assert(!to->readonly);
-	assert(isa_block_stream(from));
-	assert(isa_block_stream(to));
+	while (!bs->eof)
+		bstream_next(bs);
+	stream *rs = bs->s;
+	assert(isa_block_stream(ws));
+	assert(isa_block_stream(rs));
 
 	if (binary)
-		nwritten = mnstr_printf(to, "%srb %s\n", PROMPT3, filename);
+		nwritten = mnstr_printf(ws, "%srb %s\n", PROMPT3, filename);
 	else
-		nwritten = mnstr_printf(to, "%sr 0 %s\n", PROMPT3, filename);
+		nwritten = mnstr_printf(ws, "%sr 0 %s\n", PROMPT3, filename);
 	if (nwritten <= 0) {
-		msg = mnstr_peek_error(to);
+		msg = mnstr_peek_error(ws);
 		goto end;
 	}
-	if (mnstr_flush(to, MNSTR_FLUSH_ALL) < 0) {
-		msg = mnstr_peek_error(to);
+	if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) {
+		msg = mnstr_peek_error(ws);
 		goto end;
 	}
 
 	char buf[256];
-	if (mnstr_readline(from, buf, sizeof(buf)) != 1 || buf[0] != '\n') {
+	if (mnstr_readline(rs, buf, sizeof(buf)) != 1 || buf[0] != '\n') {
 		msg = buf;
-		discard(from);
+		discard(rs);
 		goto end;
 	}
 
@@ -135,8 +136,8 @@ mapi_request_upload(const char *filename
 		msg = mnstr_peek_error(NULL);
 		goto end;
 	}
-	state->from_client = from;
-	state->to_client = to;
+	state->from_client = rs;
+	state->to_client = ws;
 	s->stream_data.p = state;
 	s->binary= binary;
 	s->read = recv_upload_read;
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -220,9 +220,6 @@ stream_export stream *block_stream(strea
 stream_export bool isa_block_stream(const stream *s); // mapi.c, mal_client.c, remote.c, sql_scenario.c/sqlReader, sql_scan.c
 stream_export stream *bs_stream(stream *s); // unused
 
-
-stream_export stream *mapi_request_upload(const char *filename, bool binary, stream *from, stream *to);
-
 typedef enum {
 	PROTOCOL_AUTO = 0, // unused
 	PROTOCOL_9 = 1, // mal_mapi.c, mal_client.c;
@@ -265,7 +262,8 @@ stream_export stream *stream_blackhole_c
 
 stream_export stream *stream_fwf_create(stream *restrict s, size_t num_fields, size_t *restrict widths, char filler); // sql.c
 
-
 stream_export stream *create_text_stream(stream *s);
 
+stream_export stream *mapi_request_upload(const char *filename, bool binary, bstream *rs, stream *ws);
+
 #endif /*_STREAM_H_*/
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -3111,14 +3111,7 @@ mvc_import_table_wrap(Client cntxt, MalB
 		msg = mvc_import_table(cntxt, &b, be->mvc, be->mvc->scanner.rs, t, tsep, rsep, ssep, ns, sz, offset, besteffort, true, escape);
 	} else {
 		if (onclient) {
-			stream *ws = be->mvc->scanner.ws;
-			bstream *bs = be->mvc->scanner.rs;
-			while (!bs->eof)
-				bstream_next(bs);
-			stream *rs = bs->s;
-			assert(isa_block_stream(ws));
-			assert(isa_block_stream(rs));
-			ss = mapi_request_upload(fname, false, rs, ws);
+			ss = mapi_request_upload(fname, false, be->mvc->scanner.rs, be->mvc->scanner.ws);
 		} else {
 			ss = open_rastream(fname);
 		}
--- a/sql/backends/monet5/sql_bincopyfrom.c
+++ b/sql/backends/monet5/sql_bincopyfrom.c
@@ -580,15 +580,7 @@ importColumn(backend *be, bat *ret, BUN 
 
 	// Open the input stream
 	if (onclient) {
-
-			stream *ws = be->mvc->scanner.ws;
-			bstream *bs = be->mvc->scanner.rs;
-			while (!bs->eof)
-				bstream_next(bs);
-			stream *rs = bs->s;
-			assert(isa_block_stream(ws));
-			assert(isa_block_stream(rs));
-			s = stream_to_close = mapi_request_upload(path, true, rs, ws);
+		s = stream_to_close = mapi_request_upload(path, true, be->mvc->scanner.rs, be->mvc->scanner.ws);
 	} else {
 		s = stream_to_close = open_rstream(path);
 	}