Mercurial > hg > MonetDB
changeset 86126:5594cf709444
Let mapi_request_upload do the bstream handling
| author | Joeri van Ruth <joeri.van.ruth@monetdbsolutions.com> |
|---|---|
| date | Wed, 20 Jul 2022 11:13:25 +0200 |
| parents | c5976436a2d7 |
| children | 75a75d556a2c |
| files | common/stream/mapi_stream.c common/stream/stream.h sql/backends/monet5/sql.c sql/backends/monet5/sql_bincopyfrom.c |
| diffstat | 4 files changed, 19 insertions(+), 35 deletions(-) [+] |
line wrap: on
line diff
--- a/common/stream/mapi_stream.c +++ b/common/stream/mapi_stream.c @@ -92,35 +92,36 @@ recv_upload_destroy(stream *s) stream* -mapi_request_upload(const char *filename, bool binary, stream *from, stream *to) +mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws) { const char *msg = NULL; stream *s = NULL; struct mapi_recv_upload *state = NULL; ssize_t nwritten; - assert(from->readonly); - assert(!to->readonly); - assert(isa_block_stream(from)); - assert(isa_block_stream(to)); + while (!bs->eof) + bstream_next(bs); + stream *rs = bs->s; + assert(isa_block_stream(ws)); + assert(isa_block_stream(rs)); if (binary) - nwritten = mnstr_printf(to, "%srb %s\n", PROMPT3, filename); + nwritten = mnstr_printf(ws, "%srb %s\n", PROMPT3, filename); else - nwritten = mnstr_printf(to, "%sr 0 %s\n", PROMPT3, filename); + nwritten = mnstr_printf(ws, "%sr 0 %s\n", PROMPT3, filename); if (nwritten <= 0) { - msg = mnstr_peek_error(to); + msg = mnstr_peek_error(ws); goto end; } - if (mnstr_flush(to, MNSTR_FLUSH_ALL) < 0) { - msg = mnstr_peek_error(to); + if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) { + msg = mnstr_peek_error(ws); goto end; } char buf[256]; - if (mnstr_readline(from, buf, sizeof(buf)) != 1 || buf[0] != '\n') { + if (mnstr_readline(rs, buf, sizeof(buf)) != 1 || buf[0] != '\n') { msg = buf; - discard(from); + discard(rs); goto end; } @@ -135,8 +136,8 @@ mapi_request_upload(const char *filename msg = mnstr_peek_error(NULL); goto end; } - state->from_client = from; - state->to_client = to; + state->from_client = rs; + state->to_client = ws; s->stream_data.p = state; s->binary= binary; s->read = recv_upload_read;
--- a/common/stream/stream.h +++ b/common/stream/stream.h @@ -220,9 +220,6 @@ stream_export stream *block_stream(strea stream_export bool isa_block_stream(const stream *s); // mapi.c, mal_client.c, remote.c, sql_scenario.c/sqlReader, sql_scan.c stream_export stream *bs_stream(stream *s); // unused - -stream_export stream *mapi_request_upload(const char *filename, bool binary, stream *from, stream *to); - typedef enum { PROTOCOL_AUTO = 0, // unused PROTOCOL_9 = 1, // mal_mapi.c, mal_client.c; @@ -265,7 +262,8 @@ stream_export stream *stream_blackhole_c stream_export stream *stream_fwf_create(stream *restrict s, size_t num_fields, size_t *restrict widths, char filler); // sql.c - stream_export stream *create_text_stream(stream *s); +stream_export stream *mapi_request_upload(const char *filename, bool binary, bstream *rs, stream *ws); + #endif /*_STREAM_H_*/
--- a/sql/backends/monet5/sql.c +++ b/sql/backends/monet5/sql.c @@ -3111,14 +3111,7 @@ mvc_import_table_wrap(Client cntxt, MalB msg = mvc_import_table(cntxt, &b, be->mvc, be->mvc->scanner.rs, t, tsep, rsep, ssep, ns, sz, offset, besteffort, true, escape); } else { if (onclient) { - stream *ws = be->mvc->scanner.ws; - bstream *bs = be->mvc->scanner.rs; - while (!bs->eof) - bstream_next(bs); - stream *rs = bs->s; - assert(isa_block_stream(ws)); - assert(isa_block_stream(rs)); - ss = mapi_request_upload(fname, false, rs, ws); + ss = mapi_request_upload(fname, false, be->mvc->scanner.rs, be->mvc->scanner.ws); } else { ss = open_rastream(fname); }
--- a/sql/backends/monet5/sql_bincopyfrom.c +++ b/sql/backends/monet5/sql_bincopyfrom.c @@ -580,15 +580,7 @@ importColumn(backend *be, bat *ret, BUN // Open the input stream if (onclient) { - - stream *ws = be->mvc->scanner.ws; - bstream *bs = be->mvc->scanner.rs; - while (!bs->eof) - bstream_next(bs); - stream *rs = bs->s; - assert(isa_block_stream(ws)); - assert(isa_block_stream(rs)); - s = stream_to_close = mapi_request_upload(path, true, rs, ws); + s = stream_to_close = mapi_request_upload(path, true, be->mvc->scanner.rs, be->mvc->scanner.ws); } else { s = stream_to_close = open_rstream(path); }
