Mercurial > hg > MonetDB
changeset 86134:1b8c0e0b3295 sql_profiler
Merge with default
| author | Lucas Pereira <lucas.pereira@monetdbsolutions.com> |
|---|---|
| date | Thu, 21 Jul 2022 12:59:45 +0100 |
| parents | 1395d225bbe3 (current diff) 786ade607ae4 (diff) |
| children | |
| files | clients/Tests/exports.stable.out sql/backends/monet5/sql.c |
| diffstat | 14 files changed, 191 insertions(+), 869 deletions(-) [+] |
line wrap: on
line diff
--- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -1769,10 +1769,6 @@ int sql_trans_create_table(sql_table **t # stream stream *block_stream(stream *s); -stream *block_stream2(stream *s, size_t bufsiz, compression_method comp); -buffer bs2_buffer(stream *s); -int bs2_resizebuf(stream *ss, size_t bufsiz); -void bs2_setpos(stream *ss, size_t pos); stream *bs_stream(stream *s); bstream *bstream_create(stream *rs, size_t chunk_size); void bstream_destroy(bstream *s); @@ -1799,6 +1795,7 @@ stream *iconv_rstream(stream *restrict s stream *iconv_wstream(stream *restrict ss, const char *restrict charset, const char *restrict name); bool isa_block_stream(const stream *s); stream *lz4_stream(stream *inner, int preset); +stream *mapi_request_upload(const char *filename, bool binary, bstream *rs, stream *ws); void mnstr_clearerr(stream *s); void mnstr_close(stream *s); void mnstr_destroy(stream *s); @@ -1856,7 +1853,6 @@ stream *open_rstream(const char *filenam stream *open_urlstream(const char *url); stream *open_wastream(const char *filename); stream *open_wstream(const char *filename); -void set_prompting(stream *block_stream, const char *prompt, stream *prompt_stream); stream *socket_rstream(SOCKET socket, const char *name); stream *socket_wstream(SOCKET socket, const char *name); stream *stderr_wastream(void);
--- a/clients/mapilib/CMakeLists.txt +++ b/clients/mapilib/CMakeLists.txt @@ -17,7 +17,6 @@ add_library(mapi target_sources(mapi PRIVATE mapi.c - mapi_prompt.h PUBLIC $<BUILD_INTERFACE:$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>/mapi.h> $<BUILD_INTERFACE:$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>/mapi_querytype.h>
--- a/clients/odbc/tests/ODBCStmtAttr.c +++ b/clients/odbc/tests/ODBCStmtAttr.c @@ -87,7 +87,7 @@ StmtAttribute2name(SQLINTEGER attribute) case SQL_ATTR_QUERY_TIMEOUT: return "SQL_ATTR_QUERY_TIMEOUT"; default: - fprintf(stderr, "StmtAttribute2name: Unexpected value %d\n", attribute); + fprintf(stderr, "StmtAttribute2name: Unexpected value %ld\n", (long) attribute); return "NOT YET IMPLEMENTED"; } }
--- a/common/stream/CMakeLists.txt +++ b/common/stream/CMakeLists.txt @@ -21,7 +21,6 @@ target_sources(stream rw.c bstream.c bs.c - bs2.c stdio_stream.c winio.c compressed.c @@ -31,6 +30,7 @@ target_sources(stream lz4_stream.c url_stream.c socket_stream.c + mapi_stream.c memio.c callback.c blackhole.c @@ -41,6 +41,7 @@ target_sources(stream stream.h stream_internal.h stream_socket.h + mapi_prompt.h pump.h PUBLIC ${stream_public_headers})
--- a/common/stream/bs.c +++ b/common/stream/bs.c @@ -17,8 +17,6 @@ * indicated by an empty block (i.e. just a count of 0). */ -static ssize_t bs_read_internal(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt); - static bs * bs_create(void) { @@ -168,29 +166,6 @@ bs_flush(stream *ss, mnstr_flush_level f ssize_t bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt) { - ssize_t ret = bs_read_internal(ss, buf, elmsize, cnt); - if (ret != 0 || ss->eof) - return ret; - - bs *b = (bs*) ss-> stream_data.p; - if (b->prompt == NULL || b->pstream == NULL) - return 0; - - // before returning the 0 we send the prompt and make another attempt. - if (mnstr_write(b->pstream, b->prompt, strlen(b->prompt), 1) != 1) - return -1; - if (mnstr_flush(b->pstream, MNSTR_FLUSH_DATA) < 0) - return -1; - - // if it succeeds, return that to the client. - // if it's still a block boundary, return that to the client. - // if there's an error, return that to the client. - return bs_read_internal(ss, buf, elmsize, cnt); -} - -static ssize_t -bs_read_internal(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt) -{ bs *s; size_t todo = cnt * elmsize; size_t n; @@ -397,13 +372,3 @@ block_stream(stream *s) return ns; } - -void -set_prompting(stream *block_stream, const char *prompt, stream *prompt_stream) -{ - if (isa_block_stream(block_stream)) { - bs *bs = block_stream->stream_data.p; - bs->prompt = prompt; - bs->pstream = prompt_stream; - } -}
deleted file mode 100644 --- a/common/stream/bs2.c +++ /dev/null @@ -1,670 +0,0 @@ -/* - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. - * - * Copyright 1997 - July 2008 CWI, August 2008 - 2022 MonetDB B.V. - */ - -#include "monetdb_config.h" -#include "stream.h" -#include "stream_internal.h" - - -/* ------------------------------------------------------------------ */ -typedef struct bs2 { - stream *s; /* underlying stream */ - size_t nr; /* how far we got in buf */ - size_t itotal; /* amount available in current read block */ - size_t bufsiz; - size_t readpos; - compression_method comp; - char *compbuf; - size_t compbufsiz; - char *buf; -} bs2; - - -static ssize_t -compress_stream_data(bs2 *s) -{ - assert(s->comp != COMPRESSION_NONE); - if (s->comp == COMPRESSION_SNAPPY) { -#ifdef HAVE_SNAPPY - size_t compressed_length = s->compbufsiz; - snappy_status ret; - if ((ret = snappy_compress(s->buf, s->nr, s->compbuf, &compressed_length)) != SNAPPY_OK) { - return -1; - } - return compressed_length; -#else - assert(0); - return -1; -#endif - } else if (s->comp == COMPRESSION_LZ4) { -#ifdef HAVE_LIBLZ4 - int compressed_length = (int) s->compbufsiz; - assert(s->nr < INT_MAX); - if ((compressed_length = LZ4_compress_fast(s->buf, s->compbuf, (int)s->nr, compressed_length, 1)) == 0) { - return -1; - } - return compressed_length; -#else - assert(0); - return -1; -#endif - } - return -1; -} - - -static ssize_t -decompress_stream_data(bs2 *s) -{ - assert(s->comp != COMPRESSION_NONE); - if (s->comp == COMPRESSION_SNAPPY) { -#ifdef HAVE_SNAPPY - snappy_status ret; - size_t uncompressed_length = s->bufsiz; - if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) { - return -1; - } - return (ssize_t) uncompressed_length; -#else - assert(0); - return -1; -#endif - } else if (s->comp == COMPRESSION_LZ4) { -#ifdef HAVE_LIBLZ4 - int uncompressed_length = (int) s->bufsiz; - assert(s->itotal < INT_MAX); - if ((uncompressed_length = LZ4_decompress_safe(s->compbuf, s->buf, (int)s->itotal, uncompressed_length)) <= 0) { - return -1; - } - return uncompressed_length; -#else - assert(0); - return -1; -#endif - } - return -1; -} - -static ssize_t -compression_size_bound(bs2 *s) -{ - if (s->comp == COMPRESSION_NONE) { - return 0; - } else if (s->comp == COMPRESSION_SNAPPY) { -#ifndef HAVE_SNAPPY - return -1; -#else - return snappy_max_compressed_length(s->bufsiz); -#endif - } else if (s->comp == COMPRESSION_LZ4) { -#ifndef HAVE_LIBLZ4 - return -1; -#else - assert(s->bufsiz < INT_MAX); - return LZ4_compressBound((int)s->bufsiz); -#endif - } - return -1; -} - -static bs2 * -bs2_create(stream *s, size_t bufsiz, compression_method comp) -{ - /* should be a binary stream */ - bs2 *ns; - ssize_t compress_bound = 0; - - if ((ns = malloc(sizeof(*ns))) == NULL) - return NULL; - *ns = (bs2) { - .buf = malloc(bufsiz), - .s = s, - .bufsiz = bufsiz, - .comp = comp, - }; - if (ns->buf == NULL) { - free(ns); - return NULL; - } - - compress_bound = compression_size_bound(ns); - if (compress_bound > 0) { - ns->compbufsiz = (size_t) compress_bound; - ns->compbuf = malloc(ns->compbufsiz); - if (!ns->compbuf) { - free(ns->buf); - free(ns); - return NULL; - } - } else if (compress_bound < 0) { - free(ns->buf); - free(ns); - return NULL; - } - return ns; -} - -/* Collect data until the internal buffer is filled, then write the - * filled buffer to the underlying stream. - * Struct field usage: - * s - the underlying stream; - * buf - the buffer in which data is collected; - * nr - how much of buf is already filled (if nr == sizeof(buf) the - * data is written to the underlying stream, so upon entry nr < - * sizeof(buf)); - * itotal - unused. - */ -ssize_t -bs2_write(stream *restrict ss, const void *restrict buf, size_t elmsize, size_t cnt) -{ - bs2 *s; - size_t todo = cnt * elmsize; - int64_t blksize; - char *writebuf; - size_t writelen; - - s = (bs2 *) ss->stream_data.p; - if (s == NULL) - return -1; - assert(!ss->readonly); - assert(s->nr < s->bufsiz); - while (todo > 0) { - size_t n = s->bufsiz - s->nr; - - if (todo < n) - n = todo; - memcpy(s->buf + s->nr, buf, n); - s->nr += n; - todo -= n; - buf = ((const char *) buf + n); - /* block is full, write it to the stream */ - if (s->nr == s->bufsiz) { - -#ifdef BSTREAM_DEBUG - { - size_t i; - - fprintf(stderr, "W %s %zu \"", ss->name, s->nr); - for (i = 0; i < s->nr; i++) - if (' ' <= s->buf[i] && s->buf[i] < 127) - putc(s->buf[i], stderr); - else - fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]); - fprintf(stderr, "\"\n"); - } -#endif - - writelen = s->nr; - blksize = (int64_t) s->nr; - writebuf = s->buf; - - if (s->comp != COMPRESSION_NONE) { - ssize_t compressed_length = compress_stream_data(s); - if (compressed_length < 0) { - return -1; - } - writebuf = s->compbuf; - blksize = (int64_t) compressed_length; - writelen = (size_t) compressed_length; - } - - - /* the last bit tells whether a flush is in there, it's not - * at this moment, so shift it to the left */ - blksize <<= 1; - if (!mnstr_writeLng(s->s, blksize) || - s->s->write(s->s, writebuf, 1, writelen) != (ssize_t) writelen) { - mnstr_copy_error(ss, s->s); - return -1; - } - s->nr = 0; - } - } - return (ssize_t) cnt; -} - -/* If the internal buffer is partially filled, write it to the - * underlying stream. Then in any case write an empty buffer to the - * underlying stream to indicate to the receiver that the data was - * flushed. - */ -static int -bs2_flush(stream *ss, mnstr_flush_level flush_level) -{ - int64_t blksize; - bs2 *s; - char *writebuf; - size_t writelen; - - s = (bs2 *) ss->stream_data.p; - if (s == NULL) - return -1; - assert(!ss->readonly); - assert(s->nr < s->bufsiz); - if (!ss->readonly) { - /* flush the rest of buffer (if s->nr > 0), then set the - * last bit to 1 to to indicate user-instigated flush */ -#ifdef BSTREAM_DEBUG - if (s->nr > 0) { - size_t i; - - fprintf(stderr, "W %s %zu \"", ss->name, s->nr); - for (i = 0; i < s->nr; i++) - if (' ' <= s->buf[i] && s->buf[i] < 127) - putc(s->buf[i], stderr); - else - fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]); - fprintf(stderr, "\"\n"); - fprintf(stderr, "W %s 0\n", ss->name); - } -#endif - - writelen = s->nr; - blksize = (int64_t) s->nr; - writebuf = s->buf; - - if (s->nr > 0 && s->comp != COMPRESSION_NONE) { - ssize_t compressed_length = compress_stream_data(s); - if (compressed_length < 0) { - return -1; - } - writebuf = s->compbuf; - blksize = (int64_t) compressed_length; - writelen = (size_t) compressed_length; - } - - /* indicate that this is the last buffer of a block by - * setting the low-order bit */ - blksize <<= 1; - blksize |= 1; - /* always flush (even empty blocks) needed for the protocol) */ - - if ((!mnstr_writeLng(s->s, blksize) || - (s->nr > 0 && - s->s->write(s->s, writebuf, 1, writelen) != (ssize_t) writelen))) { - mnstr_copy_error(ss, s->s); - return -1; - } - s->nr = 0; - // shouldn't we flush s->s too? - (void) flush_level; - } - return 0; -} - -/* Read buffered data and return the number of items read. At the - * flush boundary we will return 0 to indicate the end of a block. - * - * Structure field usage: - * s - the underlying stream; - * buf - not used; - * itotal - the amount of data in the current block that hasn't yet - * been read; - * nr - indicates whether the flush marker has to be returned. - */ -ssize_t -bs2_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt) -{ - bs2 *s; - size_t todo = cnt * elmsize; - size_t n; - - s = (bs2 *) ss->stream_data.p; - if (s == NULL) - return -1; - assert(ss->readonly); - assert(s->nr <= 1); - - if (s->itotal == 0) { - int64_t blksize = 0; - - if (s->nr) { - /* We read the closing block but hadn't - * returned that yet. Return it now, and note - * that we did by setting s->nr to 0. */ - assert(s->nr == 1); - s->nr = 0; - return 0; - } - - assert(s->nr == 0); - - /* There is nothing more to read in the current block, - * so read the count for the next block */ - switch (mnstr_readLng(s->s, &blksize)) { - case -1: - mnstr_copy_error(ss, s->s); - return -1; - case 0: - ss->eof |= s->s->eof; - return 0; - case 1: - break; - } - if (blksize < 0) { - mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %" PRId64 "", blksize); - return -1; - } -#ifdef BSTREAM_DEBUG - fprintf(stderr, "R1 '%s' length: %" PRId64 ", final: %s\n", ss->name, blksize >> 1, blksize & 1 ? "true" : "false"); -#endif - s->itotal = (size_t) (blksize >> 1); /* amount readable */ - /* store whether this was the last block or not */ - s->nr = blksize & 1; - - if (s->itotal > 0) { - /* read everything into the comp buf */ - ssize_t uncompressed_length = (ssize_t) s->bufsiz; - size_t m = 0; - char *buf = s->buf; - - if (s->comp != COMPRESSION_NONE) { - buf = s->compbuf; - } - - while (m < s->itotal) { - ssize_t bytes_read = 0; - bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m); - if (bytes_read <= 0) { - ss->eof |= s->s->eof; - mnstr_copy_error(ss, s->s); - return -1; - } - m += (size_t) bytes_read; - } - if (s->comp != COMPRESSION_NONE) { - uncompressed_length = decompress_stream_data(s); - if (uncompressed_length < 0) { - if (s->s->errkind != MNSTR_NO__ERROR) - mnstr_copy_error(ss, s->s); - else - mnstr_set_error(ss, MNSTR_READ_ERROR, "uncompress failed with code %d", (int) uncompressed_length); - return -1; - } - } else { - uncompressed_length = (ssize_t) m; - } - s->itotal = (size_t) uncompressed_length; - s->readpos = 0; - } - } - - /* Fill the caller's buffer. */ - cnt = 0; /* count how much we put into the buffer */ - while (todo > 0) { - /* there is more data waiting in the current block, so - * read it */ - n = todo < s->itotal ? todo : s->itotal; - - memcpy(buf, s->buf + s->readpos, n); - buf = (void *) ((char *) buf + n); - cnt += n; - todo -= n; - s->readpos += n; - s->itotal -= n; - - if (s->itotal == 0) { - int64_t blksize = 0; - - /* The current block has been completely read, - * so read the count for the next block, only - * if the previous was not the last one */ - if (s->nr) - break; - switch (mnstr_readLng(s->s, &blksize)) { - case -1: - mnstr_copy_error(ss, s->s); - return -1; - case 0: - ss->eof |= s->s->eof; - return 0; - case 1: - break; - } - if (blksize < 0) { - mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %" PRId64 "", blksize); - return -1; - } -#ifdef BSTREAM_DEBUG - fprintf(stderr, "R3 '%s' length: %" PRId64 ", final: %s\n", ss->name, blksize >> 1, blksize & 1 ? "true" : "false"); -#endif - - - s->itotal = (size_t) (blksize >> 1); /* amount readable */ - /* store whether this was the last block or not */ - s->nr = blksize & 1; - - if (s->itotal > 0) { - /* read everything into the comp buf */ - ssize_t uncompressed_length = (ssize_t) s->bufsiz; - size_t m = 0; - char *buf = s->buf; - - if (s->comp != COMPRESSION_NONE) { - buf = s->compbuf; - } - - while (m < s->itotal) { - ssize_t bytes_read = 0; - bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m); - if (bytes_read <= 0) { - ss->eof |= s->s->eof; - mnstr_copy_error(ss, s->s); - return -1; - } - m += (size_t) bytes_read; - } - if (s->comp != COMPRESSION_NONE) { - uncompressed_length = decompress_stream_data(s); - if (uncompressed_length < 0) { - if (s->s->errkind != MNSTR_NO__ERROR) - mnstr_copy_error(ss, s->s); - else - mnstr_set_error(ss, MNSTR_READ_ERROR, "uncompress failed with code %d", (int) uncompressed_length); - return -1; - } - } else { - uncompressed_length = (ssize_t) m; - } - s->itotal = (size_t) uncompressed_length; - s->readpos = 0; - } - } - } - /* if we got an empty block with the end-of-sequence marker - * set (low-order bit) we must only return an empty read once, - * so we must squash the flag that we still have to return an - * empty read */ - if (todo > 0 && cnt == 0) - s->nr = 0; - return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0); -} - - - -static void -bs2_resetbuf(stream *ss) -{ - bs2 *s = (bs2 *) ss->stream_data.p; - assert(ss->read == bs2_read); - s->itotal = 0; - s->nr = 0; - s->readpos = 0; -} - -int -bs2_resizebuf(stream *ss, size_t bufsiz) -{ - ssize_t compress_bound; - bs2 *s = (bs2 *) ss->stream_data.p; - assert(ss->read == bs2_read); - - if (s->buf) - free(s->buf); - if (s->compbuf) - free(s->compbuf); - - s->bufsiz = 0; - s->buf = NULL; - s->compbuf = NULL; - - if ((s->buf = malloc(bufsiz)) == NULL) { - return -1; - } - s->bufsiz = bufsiz; - compress_bound = compression_size_bound(s); - if (compress_bound > 0) { - s->compbufsiz = (size_t) compress_bound; - s->compbuf = malloc(s->compbufsiz); - if (!s->compbuf) { - free(s->buf); - s->buf = NULL; - return -1; - } - } - bs2_resetbuf(ss); - return 0; -} - -buffer -bs2_buffer(stream *ss) -{ - bs2 *s = (bs2 *) ss->stream_data.p; - buffer b; - assert(ss->read == bs2_read); - b.buf = s->buf; - b.pos = s->nr; - b.len = s->itotal; - return b; -} - -void -bs2_setpos(stream *ss, size_t pos) -{ - bs2 *s = (bs2 *) ss->stream_data.p; - assert(pos < s->bufsiz); - s->nr = pos; -} - - - - -static void -bs2_close(stream *ss) -{ - bs2 *s; - - s = (bs2 *) ss->stream_data.p; - assert(s); - if (s == NULL) - return; - if (!ss->readonly && s->nr > 0) - bs2_flush(ss, MNSTR_FLUSH_DATA); - assert(s->s); - if (s->s) - s->s->close(s->s); -} - -static void -bs2_destroy(stream *ss) -{ - bs2 *s; - - s = (bs2 *) ss->stream_data.p; - assert(s); - if (s) { - assert(s->s); - if (s->s) - s->s->destroy(s->s); - if (s->buf) - free(s->buf); - if (s->compbuf) - free(s->compbuf); - free(s); - } - destroy_stream(ss); -} - -static void -bs2_update_timeout(stream *ss) -{ - bs2 *s; - - if ((s = ss->stream_data.p) != NULL && s->s) { - s->s->timeout = ss->timeout; - s->s->timeout_func = ss->timeout_func; - s->s->timeout_data = ss->timeout_data; - if (s->s->update_timeout) - s->s->update_timeout(s->s); - } -} - -static int -bs2_isalive(const stream *ss) -{ - struct bs2 *s; - - if ((s = ss->stream_data.p) != NULL && s->s) { - if (s->s->isalive) - return s->s->isalive(s->s); - return 1; - } - return 0; -} - -stream * -block_stream2(stream *s, size_t bufsiz, compression_method comp) -{ - stream *ns; - stream *os = NULL; - bs2 *b; - - if (s == NULL) - return NULL; - if (s->read == bs_read || s->write == bs_write) { - /* if passed in a block_stream instance, extract the - * underlying stream */ - os = s; - s = s->inner; - } - -#ifdef STREAM_DEBUG - fprintf(stderr, "block_stream2 %s\n", s->name ? s->name : "<unnamed>"); -#endif - if ((ns = create_wrapper_stream(NULL, s)) == NULL) - return NULL; - if ((b = bs2_create(s, bufsiz, comp)) == NULL) { - destroy_stream(ns); - mnstr_set_open_error(s->name, 0, "bs2_create failed"); - return NULL; - } - /* blocksizes have a fixed little endian byteorder */ -#ifdef WORDS_BIGENDIAN - s->swapbytes = true; -#endif - ns->binary = s->binary; - ns->readonly = s->readonly; - ns->close = bs2_close; - ns->clrerr = bs_clrerr; - ns->destroy = bs2_destroy; - ns->flush = bs2_flush; - ns->read = bs2_read; - ns->write = bs2_write; - ns->update_timeout = bs2_update_timeout; - ns->isalive = bs2_isalive; - ns->stream_data.p = (void *) b; - - if (os != NULL) { - /* we extracted the underlying stream, destroy the old - * shell */ - os->inner = NULL; - bs_destroy(os); - } - - return ns; -}
new file mode 100644 --- /dev/null +++ b/common/stream/mapi_stream.c @@ -0,0 +1,154 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Copyright 1997 - July 2008 CWI, August 2008 - 2022 MonetDB B.V. + */ + +#include "monetdb_config.h" +#include "stream.h" +#include "stream_internal.h" +#include "mapi_prompt.h" + + +static void +discard(stream *s) +{ + static char bitbucket[8192]; + while (1) { + ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket)); + if (nread <= 0) + return; + assert(1); + } +} + +struct mapi_recv_upload { + stream *from_client; // set to NULL after sending MAPI_PROMPT3 + stream *to_client; // set to NULL when client sends empty +}; + +static ssize_t +recv_upload_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt) +{ + struct mapi_recv_upload *state = s->stream_data.p; + + if (state->from_client == NULL) { + assert(s->eof); + return 0; + } + + ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt); + if (nread != 0 || state->from_client->eof) + return nread; + + // before returning the 0 we send the prompt and make another attempt. + if ( + mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2), 1) != 1 + || mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0 + ) { + mnstr_set_error(s, mnstr_errnr(state->to_client), "%s", mnstr_peek_error(state->to_client)); + return -1; + } + + // if it succeeds, return that to the client. + // if it's still a block boundary, return that to the client. + // if there's an error, return that to the client. + nread = mnstr_read(state->from_client, buf, elmsize, cnt); + if (nread > 0) + return nread; + if (nread == 0) { + s->eof = true; + state->from_client = NULL; + return nread; + } else { + mnstr_set_error(s, mnstr_errnr(state->from_client), "%s", mnstr_peek_error(state->from_client)); + return -1; + } +} + +static void +recv_upload_close(stream *s) +{ + struct mapi_recv_upload *state = s->stream_data.p; + + stream *from = state->from_client; + if (from) + discard(from); + + stream *to = state->to_client; + mnstr_write(to, PROMPT3, strlen(PROMPT3), 1); + mnstr_flush(to, MNSTR_FLUSH_ALL); +} + +static void +recv_upload_destroy(stream *s) +{ + struct mapi_recv_upload *state = s->stream_data.p; + free(state); + free(s); +} + + +stream* +mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws) +{ + const char *msg = NULL; + stream *s = NULL; + struct mapi_recv_upload *state = NULL; + ssize_t nwritten; + + while (!bs->eof) + bstream_next(bs); + stream *rs = bs->s; + assert(isa_block_stream(ws)); + assert(isa_block_stream(rs)); + + if (binary) + nwritten = mnstr_printf(ws, "%srb %s\n", PROMPT3, filename); + else + nwritten = mnstr_printf(ws, "%sr 0 %s\n", PROMPT3, filename); + if (nwritten <= 0) { + msg = mnstr_peek_error(ws); + goto end; + } + if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) { + msg = mnstr_peek_error(ws); + goto end; + } + + char buf[256]; + if (mnstr_readline(rs, buf, sizeof(buf)) != 1 || buf[0] != '\n') { + msg = buf; + discard(rs); + goto end; + } + + // Client accepted the request + state = malloc(sizeof(*state)); + if (!state) { + msg = "malloc failed"; + goto end; + } + s = create_stream("ONCLIENT"); + if (!s) { + msg = mnstr_peek_error(NULL); + goto end; + } + state->from_client = rs; + state->to_client = ws; + s->stream_data.p = state; + s->binary= binary; + s->read = recv_upload_read; + s->close = recv_upload_close; + s->destroy = recv_upload_destroy; +end: + if (msg) { + mnstr_destroy(s); + mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg); + return NULL; + } else { + return s; + } +}
--- a/common/stream/stream.c +++ b/common/stream/stream.c @@ -954,9 +954,7 @@ isa_block_stream(const stream *s) assert(s != NULL); return s && ((s->read == bs_read || - s->write == bs_write) || - (s->read == bs2_read || - s->write == bs2_write)); + s->write == bs_write)); }
--- a/common/stream/stream.h +++ b/common/stream/stream.h @@ -219,8 +219,6 @@ stream_export buffer *mnstr_get_buffer(s stream_export stream *block_stream(stream *s); // mapi.c, mal_mapi.c, client.c, merovingian stream_export bool isa_block_stream(const stream *s); // mapi.c, mal_client.c, remote.c, sql_scenario.c/sqlReader, sql_scan.c stream_export stream *bs_stream(stream *s); // unused -stream_export void set_prompting(stream *block_stream, const char *prompt, stream *prompt_stream); - typedef enum { PROTOCOL_AUTO = 0, // unused @@ -228,18 +226,6 @@ typedef enum { PROTOCOL_COLUMNAR = 3 // sql_result.c } protocol_version; -typedef enum { - COMPRESSION_NONE = 0, // mal_mapi.c - COMPRESSION_SNAPPY = 1, // mcrypt.c, mal_mapi.c - COMPRESSION_LZ4 = 2, // same - COMPRESSION_AUTO = 255 // never used -} compression_method; - -stream_export stream *block_stream2(stream *s, size_t bufsiz, compression_method comp); // mal_mapi.c -stream_export int bs2_resizebuf(stream *ss, size_t bufsiz); // sql_result.c -stream_export buffer bs2_buffer(stream *s); // sql_result.c -stream_export void bs2_setpos(stream *ss, size_t pos); // sql_result.c - /* read block of data including the end of block marker */ stream_export ssize_t mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt); @@ -276,7 +262,8 @@ stream_export stream *stream_blackhole_c stream_export stream *stream_fwf_create(stream *restrict s, size_t num_fields, size_t *restrict widths, char filler); // sql.c - stream_export stream *create_text_stream(stream *s); +stream_export stream *mapi_request_upload(const char *filename, bool binary, bstream *rs, stream *ws); + #endif /*_STREAM_H_*/
--- a/common/stream/stream_internal.h +++ b/common/stream/stream_internal.h @@ -271,8 +271,6 @@ struct bs { unsigned itotal; /* amount available in current read block */ size_t blks; /* read/writen blocks (possibly partial) */ size_t bytes; /* read/writen bytes */ - const char *prompt; /* on eof, first try to send this then try again */ - stream *pstream; /* stream to send prompts on */ char buf[BLOCK]; /* the buffered data (minus the size of * size-short */ };
--- a/sql/backends/monet5/sql.c +++ b/sql/backends/monet5/sql.c @@ -3101,7 +3101,6 @@ mvc_import_table_wrap(Client cntxt, MalB { backend *be; BAT **b = NULL; - ssize_t len = 0; sql_table *t = *(sql_table **) getArgReference(stk, pci, pci->retc + 0); const char *tsep = *getArgReference_str(stk, pci, pci->retc + 1); const char *rsep = *getArgReference_str(stk, pci, pci->retc + 2); @@ -3137,41 +3136,14 @@ mvc_import_table_wrap(Client cntxt, MalB msg = mvc_import_table(cntxt, &b, be->mvc, be->mvc->scanner.rs, t, tsep, rsep, ssep, ns, sz, offset, besteffort, true, escape); } else { if (onclient) { - mnstr_write(be->mvc->scanner.ws, PROMPT3, sizeof(PROMPT3)-1, 1); - if (offset > 1 && rsep && rsep[0] == '\n' && rsep[1] == '\0') { - /* only let client skip simple lines */ - mnstr_printf(be->mvc->scanner.ws, "r " LLFMT " %s\n", - offset, fname); - offset = 0; - } else { - mnstr_printf(be->mvc->scanner.ws, "r 0 %s\n", fname); - } - msg = MAL_SUCCEED; - mnstr_flush(be->mvc->scanner.ws, MNSTR_FLUSH_DATA); - while (!be->mvc->scanner.rs->eof) - bstream_next(be->mvc->scanner.rs); - ss = be->mvc->scanner.rs->s; - char buf[80]; - if ((len = mnstr_readline(ss, buf, sizeof(buf))) > 1) { - if (buf[0] == '!' && buf[6] == '!') - msg = createException(IO, "sql.copy_from", "%.7s%s: %s", buf, fname, buf+7); - else - msg = createException(IO, "sql.copy_from", "%s: %s", fname, buf); - while (buf[len - 1] != '\n' && - (len = mnstr_readline(ss, buf, sizeof(buf))) > 0) - ; - /* read until flush marker */ - while (mnstr_read(ss, buf, 1, sizeof(buf)) > 0) - ; - return msg; - } + ss = mapi_request_upload(fname, false, be->mvc->scanner.rs, be->mvc->scanner.ws); } else { ss = open_rastream(fname); - if (ss == NULL || mnstr_errnr(ss)) { - msg = createException(IO, "sql.copy_from", SQLSTATE(42000) "%s", mnstr_peek_error(NULL)); - close_stream(ss); - return msg; - } + } + if (ss == NULL || mnstr_errnr(ss)) { + msg = createException(IO, "sql.copy_from", SQLSTATE(42000) "%s", mnstr_peek_error(NULL)); + close_stream(ss); + return msg; } if (!strNil(fixed_widths)) { @@ -3220,12 +3192,7 @@ mvc_import_table_wrap(Client cntxt, MalB throw(MAL, "sql.copy_from", SQLSTATE(HY013) MAL_MALLOC_FAIL); } msg = mvc_import_table(cntxt, &b, be->mvc, s, t, tsep, rsep, ssep, ns, sz, offset, besteffort, false, escape); - if (onclient) { - mnstr_write(be->mvc->scanner.ws, PROMPT3, sizeof(PROMPT3)-1, 1); - mnstr_flush(be->mvc->scanner.ws, MNSTR_FLUSH_DATA); - be->mvc->scanner.rs->eof = s->eof; - s->s = NULL; - } + // This also closes ss: bstream_destroy(s); } if (b && !msg)
--- a/sql/backends/monet5/sql_bincopyfrom.c +++ b/sql/backends/monet5/sql_bincopyfrom.c @@ -543,70 +543,6 @@ load_column(struct type_rec *rec, const return msg; } - -static str -start_mapi_file_upload(backend *be, str path, stream **s) -{ - str msg = MAL_SUCCEED; - *s = NULL; - - stream *ws = be->mvc->scanner.ws; - bstream *bs = be->mvc->scanner.rs; - stream *rs = bs->s; - assert(isa_block_stream(ws)); - assert(isa_block_stream(rs)); - - mnstr_write(ws, PROMPT3, sizeof(PROMPT3)-1, 1); - mnstr_printf(ws, "rb %s\n", path); - mnstr_flush(ws, MNSTR_FLUSH_DATA); - while (!bs->eof) - bstream_next(bs); - char buf[80]; - if (mnstr_readline(rs, buf, sizeof(buf)) > 1) { - msg = createException(IO, "sql.importColumn", "Error %s", buf); - goto end; - } - set_prompting(rs, PROMPT2, ws); - - *s = rs; -end: - return msg; -} - - -static str -finish_mapi_file_upload(backend *be, bool eof_reached) -{ - str msg = MAL_SUCCEED; - stream *ws = be->mvc->scanner.ws; - bstream *bs = be->mvc->scanner.rs; - stream *rs = bs->s; - assert(isa_block_stream(ws)); - assert(isa_block_stream(rs)); - - set_prompting(rs, NULL, NULL); - if (!eof_reached) { - // Probably due to an error. Read until message boundary. - char buf[8190]; - while (1) { - ssize_t nread = mnstr_read(rs, buf, 1, sizeof(buf)); - if (nread > 0) - continue; - if (nread < 0) - msg = createException( - IO, "sql.importColumn", - "while syncing read stream: %s", mnstr_peek_error(rs)); - break; - } - } - mnstr_write(ws, PROMPT3, sizeof(PROMPT3)-1, 1); - mnstr_flush(ws, MNSTR_FLUSH_DATA); - - return msg; -} - - - /* Import a single file into a new BAT. */ static str @@ -620,7 +556,6 @@ importColumn(backend *be, bat *ret, BUN int gdk_type; BAT *bat = NULL; stream *stream_to_close = NULL; - bool do_finish_mapi = false; int eof_reached = -1; // 1 = read to the end; 0 = stopped reading early; -1 = unset, a bug. // This one is not managed by the end: block @@ -645,15 +580,13 @@ importColumn(backend *be, bat *ret, BUN // Open the input stream if (onclient) { - s = NULL; - do_finish_mapi = true; - msg = start_mapi_file_upload(be, path, &s); - if (msg != MAL_SUCCEED) - goto end; + s = stream_to_close = mapi_request_upload(path, true, be->mvc->scanner.rs, be->mvc->scanner.ws); } else { s = stream_to_close = open_rstream(path); - if (s == NULL) - bailout("Couldn't open '%s' on server: %s", path, mnstr_peek_error(NULL)); + } + if (!s) { + msg = mnstr_error(NULL); + goto end; } // Do the work @@ -667,12 +600,6 @@ importColumn(backend *be, bat *ret, BUN // Fall through into the end block which will clean things up end: - if (do_finish_mapi) { - str msg1 = finish_mapi_file_upload(be, eof_reached == 1); - if (msg == MAL_SUCCEED) - msg = msg1; - } - if (stream_to_close) close_stream(stream_to_close);
