changeset 86134:1b8c0e0b3295 sql_profiler

Merge with default
author Lucas Pereira <lucas.pereira@monetdbsolutions.com>
date Thu, 21 Jul 2022 12:59:45 +0100
parents 1395d225bbe3 (current diff) 786ade607ae4 (diff)
children
files clients/Tests/exports.stable.out sql/backends/monet5/sql.c
diffstat 14 files changed, 191 insertions(+), 869 deletions(-) [+]
line wrap: on
line diff
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1769,10 +1769,6 @@ int sql_trans_create_table(sql_table **t
 
 # stream
 stream *block_stream(stream *s);
-stream *block_stream2(stream *s, size_t bufsiz, compression_method comp);
-buffer bs2_buffer(stream *s);
-int bs2_resizebuf(stream *ss, size_t bufsiz);
-void bs2_setpos(stream *ss, size_t pos);
 stream *bs_stream(stream *s);
 bstream *bstream_create(stream *rs, size_t chunk_size);
 void bstream_destroy(bstream *s);
@@ -1799,6 +1795,7 @@ stream *iconv_rstream(stream *restrict s
 stream *iconv_wstream(stream *restrict ss, const char *restrict charset, const char *restrict name);
 bool isa_block_stream(const stream *s);
 stream *lz4_stream(stream *inner, int preset);
+stream *mapi_request_upload(const char *filename, bool binary, bstream *rs, stream *ws);
 void mnstr_clearerr(stream *s);
 void mnstr_close(stream *s);
 void mnstr_destroy(stream *s);
@@ -1856,7 +1853,6 @@ stream *open_rstream(const char *filenam
 stream *open_urlstream(const char *url);
 stream *open_wastream(const char *filename);
 stream *open_wstream(const char *filename);
-void set_prompting(stream *block_stream, const char *prompt, stream *prompt_stream);
 stream *socket_rstream(SOCKET socket, const char *name);
 stream *socket_wstream(SOCKET socket, const char *name);
 stream *stderr_wastream(void);
--- a/clients/mapilib/CMakeLists.txt
+++ b/clients/mapilib/CMakeLists.txt
@@ -17,7 +17,6 @@ add_library(mapi
 target_sources(mapi
   PRIVATE
   mapi.c
-  mapi_prompt.h
   PUBLIC
   $<BUILD_INTERFACE:$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>/mapi.h>
   $<BUILD_INTERFACE:$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>/mapi_querytype.h>
--- a/clients/odbc/tests/ODBCStmtAttr.c
+++ b/clients/odbc/tests/ODBCStmtAttr.c
@@ -87,7 +87,7 @@ StmtAttribute2name(SQLINTEGER attribute)
 	case SQL_ATTR_QUERY_TIMEOUT:
 		return "SQL_ATTR_QUERY_TIMEOUT";
 	default:
-		fprintf(stderr, "StmtAttribute2name: Unexpected value %d\n", attribute);
+		fprintf(stderr, "StmtAttribute2name: Unexpected value %ld\n", (long) attribute);
 		return "NOT YET IMPLEMENTED";
 	}
 }
--- a/common/stream/CMakeLists.txt
+++ b/common/stream/CMakeLists.txt
@@ -21,7 +21,6 @@ target_sources(stream
   rw.c
   bstream.c
   bs.c
-  bs2.c
   stdio_stream.c
   winio.c
   compressed.c
@@ -31,6 +30,7 @@ target_sources(stream
   lz4_stream.c
   url_stream.c
   socket_stream.c
+  mapi_stream.c
   memio.c
   callback.c
   blackhole.c
@@ -41,6 +41,7 @@ target_sources(stream
   stream.h
   stream_internal.h
   stream_socket.h
+  mapi_prompt.h
   pump.h
   PUBLIC
   ${stream_public_headers})
--- a/common/stream/bs.c
+++ b/common/stream/bs.c
@@ -17,8 +17,6 @@
  * indicated by an empty block (i.e. just a count of 0).
  */
 
-static ssize_t bs_read_internal(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt);
-
 static bs *
 bs_create(void)
 {
@@ -168,29 +166,6 @@ bs_flush(stream *ss, mnstr_flush_level f
 ssize_t
 bs_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
 {
-	ssize_t ret = bs_read_internal(ss, buf, elmsize, cnt);
-	if (ret != 0 || ss->eof)
-		return ret;
-
-	bs *b = (bs*) ss-> stream_data.p;
-	if (b->prompt == NULL || b->pstream == NULL)
-		return 0;
-
-	// before returning the 0 we send the prompt and make another attempt.
-	if (mnstr_write(b->pstream, b->prompt, strlen(b->prompt), 1) != 1)
-		return -1;
-	if (mnstr_flush(b->pstream, MNSTR_FLUSH_DATA) < 0)
-		return -1;
-
-	// if it succeeds, return that to the client.
-	// if it's still a block boundary, return that to the client.
-	// if there's an error, return that to the client.
-	return bs_read_internal(ss, buf, elmsize, cnt);
-}
-
-static ssize_t
-bs_read_internal(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
-{
 	bs *s;
 	size_t todo = cnt * elmsize;
 	size_t n;
@@ -397,13 +372,3 @@ block_stream(stream *s)
 
 	return ns;
 }
-
-void
-set_prompting(stream *block_stream, const char *prompt, stream *prompt_stream)
-{
-	if (isa_block_stream(block_stream)) {
-		bs *bs = block_stream->stream_data.p;
-		bs->prompt = prompt;
-		bs->pstream = prompt_stream;
-	}
-}
deleted file mode 100644
--- a/common/stream/bs2.c
+++ /dev/null
@@ -1,670 +0,0 @@
-/*
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0.  If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- *
- * Copyright 1997 - July 2008 CWI, August 2008 - 2022 MonetDB B.V.
- */
-
-#include "monetdb_config.h"
-#include "stream.h"
-#include "stream_internal.h"
-
-
-/* ------------------------------------------------------------------ */
-typedef struct bs2 {
-	stream *s;		/* underlying stream */
-	size_t nr;		/* how far we got in buf */
-	size_t itotal;		/* amount available in current read block */
-	size_t bufsiz;
-	size_t readpos;
-	compression_method comp;
-	char *compbuf;
-	size_t compbufsiz;
-	char *buf;
-} bs2;
-
-
-static ssize_t
-compress_stream_data(bs2 *s)
-{
-	assert(s->comp != COMPRESSION_NONE);
-	if (s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_SNAPPY
-		size_t compressed_length = s->compbufsiz;
-		snappy_status ret;
-		if ((ret = snappy_compress(s->buf, s->nr, s->compbuf, &compressed_length)) != SNAPPY_OK) {
-			return -1;
-		}
-		return compressed_length;
-#else
-		assert(0);
-		return -1;
-#endif
-	} else if (s->comp == COMPRESSION_LZ4) {
-#ifdef HAVE_LIBLZ4
-		int compressed_length = (int) s->compbufsiz;
-		assert(s->nr < INT_MAX);
-		if ((compressed_length = LZ4_compress_fast(s->buf, s->compbuf, (int)s->nr, compressed_length, 1)) == 0) {
-			return -1;
-		}
-		return compressed_length;
-#else
-		assert(0);
-		return -1;
-#endif
-	}
-	return -1;
-}
-
-
-static ssize_t
-decompress_stream_data(bs2 *s)
-{
-	assert(s->comp != COMPRESSION_NONE);
-	if (s->comp == COMPRESSION_SNAPPY) {
-#ifdef HAVE_SNAPPY
-		snappy_status ret;
-		size_t uncompressed_length = s->bufsiz;
-		if ((ret = snappy_uncompress(s->compbuf, s->itotal, s->buf, &uncompressed_length)) != SNAPPY_OK) {
-			return -1;
-		}
-		return (ssize_t) uncompressed_length;
-#else
-		assert(0);
-		return -1;
-#endif
-	} else if (s->comp == COMPRESSION_LZ4) {
-#ifdef HAVE_LIBLZ4
-		int uncompressed_length = (int) s->bufsiz;
-		assert(s->itotal < INT_MAX);
-		if ((uncompressed_length = LZ4_decompress_safe(s->compbuf, s->buf, (int)s->itotal, uncompressed_length)) <= 0) {
-			return -1;
-		}
-		return uncompressed_length;
-#else
-		assert(0);
-		return -1;
-#endif
-	}
-	return -1;
-}
-
-static ssize_t
-compression_size_bound(bs2 *s)
-{
-	if (s->comp == COMPRESSION_NONE) {
-		return 0;
-	} else if (s->comp == COMPRESSION_SNAPPY) {
-#ifndef HAVE_SNAPPY
-		return -1;
-#else
-		return snappy_max_compressed_length(s->bufsiz);
-#endif
-	} else if (s->comp == COMPRESSION_LZ4) {
-#ifndef HAVE_LIBLZ4
-		return -1;
-#else
-		assert(s->bufsiz < INT_MAX);
-		return LZ4_compressBound((int)s->bufsiz);
-#endif
-	}
-	return -1;
-}
-
-static bs2 *
-bs2_create(stream *s, size_t bufsiz, compression_method comp)
-{
-	/* should be a binary stream */
-	bs2 *ns;
-	ssize_t compress_bound = 0;
-
-	if ((ns = malloc(sizeof(*ns))) == NULL)
-		return NULL;
-	*ns = (bs2) {
-		.buf = malloc(bufsiz),
-		.s = s,
-		.bufsiz = bufsiz,
-		.comp = comp,
-	};
-	if (ns->buf == NULL) {
-		free(ns);
-		return NULL;
-	}
-
-	compress_bound = compression_size_bound(ns);
-	if (compress_bound > 0) {
-		ns->compbufsiz = (size_t) compress_bound;
-		ns->compbuf = malloc(ns->compbufsiz);
-		if (!ns->compbuf) {
-			free(ns->buf);
-			free(ns);
-			return NULL;
-		}
-	} else if (compress_bound < 0) {
-		free(ns->buf);
-		free(ns);
-		return NULL;
-	}
-	return ns;
-}
-
-/* Collect data until the internal buffer is filled, then write the
- * filled buffer to the underlying stream.
- * Struct field usage:
- * s - the underlying stream;
- * buf - the buffer in which data is collected;
- * nr - how much of buf is already filled (if nr == sizeof(buf) the
- *      data is written to the underlying stream, so upon entry nr <
- *      sizeof(buf));
- * itotal - unused.
- */
-ssize_t
-bs2_write(stream *restrict ss, const void *restrict buf, size_t elmsize, size_t cnt)
-{
-	bs2 *s;
-	size_t todo = cnt * elmsize;
-	int64_t blksize;
-	char *writebuf;
-	size_t writelen;
-
-	s = (bs2 *) ss->stream_data.p;
-	if (s == NULL)
-		return -1;
-	assert(!ss->readonly);
-	assert(s->nr < s->bufsiz);
-	while (todo > 0) {
-		size_t n = s->bufsiz - s->nr;
-
-		if (todo < n)
-			n = todo;
-		memcpy(s->buf + s->nr, buf, n);
-		s->nr += n;
-		todo -= n;
-		buf = ((const char *) buf + n);
-		/* block is full, write it to the stream */
-		if (s->nr == s->bufsiz) {
-
-#ifdef BSTREAM_DEBUG
-			{
-				size_t i;
-
-				fprintf(stderr, "W %s %zu \"", ss->name, s->nr);
-				for (i = 0; i < s->nr; i++)
-					if (' ' <= s->buf[i] && s->buf[i] < 127)
-						putc(s->buf[i], stderr);
-					else
-						fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
-				fprintf(stderr, "\"\n");
-			}
-#endif
-
-			writelen = s->nr;
-			blksize = (int64_t) s->nr;
-			writebuf = s->buf;
-
-			if (s->comp != COMPRESSION_NONE) {
-				ssize_t compressed_length = compress_stream_data(s);
-				if (compressed_length < 0) {
-					return -1;
-				}
-				writebuf = s->compbuf;
-				blksize = (int64_t) compressed_length;
-				writelen = (size_t) compressed_length;
-			}
-
-
-			/* the last bit tells whether a flush is in there, it's not
-			 * at this moment, so shift it to the left */
-			blksize <<= 1;
-			if (!mnstr_writeLng(s->s, blksize) ||
-			    s->s->write(s->s, writebuf, 1, writelen) != (ssize_t) writelen) {
-				mnstr_copy_error(ss, s->s);
-				return -1;
-			}
-			s->nr = 0;
-		}
-	}
-	return (ssize_t) cnt;
-}
-
-/* If the internal buffer is partially filled, write it to the
- * underlying stream.  Then in any case write an empty buffer to the
- * underlying stream to indicate to the receiver that the data was
- * flushed.
- */
-static int
-bs2_flush(stream *ss, mnstr_flush_level flush_level)
-{
-	int64_t blksize;
-	bs2 *s;
-	char *writebuf;
-	size_t writelen;
-
-	s = (bs2 *) ss->stream_data.p;
-	if (s == NULL)
-		return -1;
-	assert(!ss->readonly);
-	assert(s->nr < s->bufsiz);
-	if (!ss->readonly) {
-		/* flush the rest of buffer (if s->nr > 0), then set the
-		 * last bit to 1 to to indicate user-instigated flush */
-#ifdef BSTREAM_DEBUG
-		if (s->nr > 0) {
-			size_t i;
-
-			fprintf(stderr, "W %s %zu \"", ss->name, s->nr);
-			for (i = 0; i < s->nr; i++)
-				if (' ' <= s->buf[i] && s->buf[i] < 127)
-					putc(s->buf[i], stderr);
-				else
-					fprintf(stderr, "\\%03o", (unsigned char) s->buf[i]);
-			fprintf(stderr, "\"\n");
-			fprintf(stderr, "W %s 0\n", ss->name);
-		}
-#endif
-
-		writelen = s->nr;
-		blksize = (int64_t) s->nr;
-		writebuf = s->buf;
-
-		if (s->nr > 0 && s->comp != COMPRESSION_NONE) {
-			ssize_t compressed_length = compress_stream_data(s);
-			if (compressed_length < 0) {
-				return -1;
-			}
-			writebuf = s->compbuf;
-			blksize = (int64_t) compressed_length;
-			writelen = (size_t) compressed_length;
-		}
-
-		/* indicate that this is the last buffer of a block by
-		 * setting the low-order bit */
-		blksize <<= 1;
-		blksize |= 1;
-		/* always flush (even empty blocks) needed for the protocol) */
-
-		if ((!mnstr_writeLng(s->s, blksize) ||
-		     (s->nr > 0 &&
-		      s->s->write(s->s, writebuf, 1, writelen) != (ssize_t) writelen))) {
-			mnstr_copy_error(ss, s->s);
-			return -1;
-		}
-		s->nr = 0;
-		// shouldn't we flush s->s too?
-		(void) flush_level;
-	}
-	return 0;
-}
-
-/* Read buffered data and return the number of items read.  At the
- * flush boundary we will return 0 to indicate the end of a block.
- *
- * Structure field usage:
- * s - the underlying stream;
- * buf - not used;
- * itotal - the amount of data in the current block that hasn't yet
- *          been read;
- * nr - indicates whether the flush marker has to be returned.
- */
-ssize_t
-bs2_read(stream *restrict ss, void *restrict buf, size_t elmsize, size_t cnt)
-{
-	bs2 *s;
-	size_t todo = cnt * elmsize;
-	size_t n;
-
-	s = (bs2 *) ss->stream_data.p;
-	if (s == NULL)
-		return -1;
-	assert(ss->readonly);
-	assert(s->nr <= 1);
-
-	if (s->itotal == 0) {
-		int64_t blksize = 0;
-
-		if (s->nr) {
-			/* We read the closing block but hadn't
-			 * returned that yet. Return it now, and note
-			 * that we did by setting s->nr to 0. */
-			assert(s->nr == 1);
-			s->nr = 0;
-			return 0;
-		}
-
-		assert(s->nr == 0);
-
-		/* There is nothing more to read in the current block,
-		 * so read the count for the next block */
-		switch (mnstr_readLng(s->s, &blksize)) {
-		case -1:
-			mnstr_copy_error(ss, s->s);
-			return -1;
-		case 0:
-			ss->eof |= s->s->eof;
-			return 0;
-		case 1:
-			break;
-		}
-		if (blksize < 0) {
-			mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %" PRId64 "", blksize);
-			return -1;
-		}
-#ifdef BSTREAM_DEBUG
-		fprintf(stderr, "R1 '%s' length: %" PRId64 ", final: %s\n", ss->name, blksize >> 1, blksize & 1 ? "true" : "false");
-#endif
-		s->itotal = (size_t) (blksize >> 1);	/* amount readable */
-		/* store whether this was the last block or not */
-		s->nr = blksize & 1;
-
-		if (s->itotal > 0) {
-			/* read everything into the comp buf */
-			ssize_t uncompressed_length = (ssize_t) s->bufsiz;
-			size_t m = 0;
-			char *buf = s->buf;
-
-			if (s->comp != COMPRESSION_NONE) {
-				buf = s->compbuf;
-			}
-
-			while (m < s->itotal) {
-				ssize_t bytes_read = 0;
-				bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m);
-				if (bytes_read <= 0) {
-					ss->eof |= s->s->eof;
-					mnstr_copy_error(ss, s->s);
-					return -1;
-				}
-				m += (size_t) bytes_read;
-			}
-			if (s->comp != COMPRESSION_NONE) {
-				uncompressed_length = decompress_stream_data(s);
-				if (uncompressed_length < 0) {
-					if (s->s->errkind != MNSTR_NO__ERROR)
-						mnstr_copy_error(ss, s->s);
-					else
-						mnstr_set_error(ss, MNSTR_READ_ERROR, "uncompress failed with code %d", (int) uncompressed_length);
-					return -1;
-				}
-			} else {
-				uncompressed_length = (ssize_t) m;
-			}
-			s->itotal = (size_t) uncompressed_length;
-			s->readpos = 0;
-		}
-	}
-
-	/* Fill the caller's buffer. */
-	cnt = 0;		/* count how much we put into the buffer */
-	while (todo > 0) {
-		/* there is more data waiting in the current block, so
-		 * read it */
-		n = todo < s->itotal ? todo : s->itotal;
-
-		memcpy(buf, s->buf + s->readpos, n);
-		buf = (void *) ((char *) buf + n);
-		cnt += n;
-		todo -= n;
-		s->readpos += n;
-		s->itotal -= n;
-
-		if (s->itotal == 0) {
-			int64_t blksize = 0;
-
-			/* The current block has been completely read,
-			 * so read the count for the next block, only
-			 * if the previous was not the last one */
-			if (s->nr)
-				break;
-			switch (mnstr_readLng(s->s, &blksize)) {
-			case -1:
-				mnstr_copy_error(ss, s->s);
-				return -1;
-			case 0:
-				ss->eof |= s->s->eof;
-				return 0;
-			case 1:
-				break;
-			}
-			if (blksize < 0) {
-				mnstr_set_error(ss, MNSTR_READ_ERROR, "invalid block size %" PRId64 "", blksize);
-				return -1;
-			}
-#ifdef BSTREAM_DEBUG
-			fprintf(stderr, "R3 '%s' length: %" PRId64 ", final: %s\n", ss->name, blksize >> 1, blksize & 1 ? "true" : "false");
-#endif
-
-
-			s->itotal = (size_t) (blksize >> 1);	/* amount readable */
-			/* store whether this was the last block or not */
-			s->nr = blksize & 1;
-
-			if (s->itotal > 0) {
-				/* read everything into the comp buf */
-				ssize_t uncompressed_length = (ssize_t) s->bufsiz;
-				size_t m = 0;
-				char *buf = s->buf;
-
-				if (s->comp != COMPRESSION_NONE) {
-					buf = s->compbuf;
-				}
-
-				while (m < s->itotal) {
-					ssize_t bytes_read = 0;
-					bytes_read = s->s->read(s->s, buf + m, 1, s->itotal - m);
-					if (bytes_read <= 0) {
-						ss->eof |= s->s->eof;
-						mnstr_copy_error(ss, s->s);
-						return -1;
-					}
-					m += (size_t) bytes_read;
-				}
-				if (s->comp != COMPRESSION_NONE) {
-					uncompressed_length = decompress_stream_data(s);
-					if (uncompressed_length < 0) {
-						if (s->s->errkind != MNSTR_NO__ERROR)
-							mnstr_copy_error(ss, s->s);
-						else
-							mnstr_set_error(ss, MNSTR_READ_ERROR, "uncompress failed with code %d", (int) uncompressed_length);
-						return -1;
-					}
-				} else {
-					uncompressed_length = (ssize_t) m;
-				}
-				s->itotal = (size_t) uncompressed_length;
-				s->readpos = 0;
-			}
-		}
-	}
-	/* if we got an empty block with the end-of-sequence marker
-	 * set (low-order bit) we must only return an empty read once,
-	 * so we must squash the flag that we still have to return an
-	 * empty read */
-	if (todo > 0 && cnt == 0)
-		s->nr = 0;
-	return (ssize_t) (elmsize > 0 ? cnt / elmsize : 0);
-}
-
-
-
-static void
-bs2_resetbuf(stream *ss)
-{
-	bs2 *s = (bs2 *) ss->stream_data.p;
-	assert(ss->read == bs2_read);
-	s->itotal = 0;
-	s->nr = 0;
-	s->readpos = 0;
-}
-
-int
-bs2_resizebuf(stream *ss, size_t bufsiz)
-{
-	ssize_t compress_bound;
-	bs2 *s = (bs2 *) ss->stream_data.p;
-	assert(ss->read == bs2_read);
-
-	if (s->buf)
-		free(s->buf);
-	if (s->compbuf)
-		free(s->compbuf);
-
-	s->bufsiz = 0;
-	s->buf = NULL;
-	s->compbuf = NULL;
-
-	if ((s->buf = malloc(bufsiz)) == NULL) {
-		return -1;
-	}
-	s->bufsiz = bufsiz;
-	compress_bound = compression_size_bound(s);
-	if (compress_bound > 0) {
-		s->compbufsiz = (size_t) compress_bound;
-		s->compbuf = malloc(s->compbufsiz);
-		if (!s->compbuf) {
-			free(s->buf);
-			s->buf = NULL;
-			return -1;
-		}
-	}
-	bs2_resetbuf(ss);
-	return 0;
-}
-
-buffer
-bs2_buffer(stream *ss)
-{
-	bs2 *s = (bs2 *) ss->stream_data.p;
-	buffer b;
-	assert(ss->read == bs2_read);
-	b.buf = s->buf;
-	b.pos = s->nr;
-	b.len = s->itotal;
-	return b;
-}
-
-void
-bs2_setpos(stream *ss, size_t pos)
-{
-	bs2 *s = (bs2 *) ss->stream_data.p;
-	assert(pos < s->bufsiz);
-	s->nr = pos;
-}
-
-
-
-
-static void
-bs2_close(stream *ss)
-{
-	bs2 *s;
-
-	s = (bs2 *) ss->stream_data.p;
-	assert(s);
-	if (s == NULL)
-		return;
-	if (!ss->readonly && s->nr > 0)
-		bs2_flush(ss, MNSTR_FLUSH_DATA);
-	assert(s->s);
-	if (s->s)
-		s->s->close(s->s);
-}
-
-static void
-bs2_destroy(stream *ss)
-{
-	bs2 *s;
-
-	s = (bs2 *) ss->stream_data.p;
-	assert(s);
-	if (s) {
-		assert(s->s);
-		if (s->s)
-			s->s->destroy(s->s);
-		if (s->buf)
-			free(s->buf);
-		if (s->compbuf)
-			free(s->compbuf);
-		free(s);
-	}
-	destroy_stream(ss);
-}
-
-static void
-bs2_update_timeout(stream *ss)
-{
-	bs2 *s;
-
-	if ((s = ss->stream_data.p) != NULL && s->s) {
-		s->s->timeout = ss->timeout;
-		s->s->timeout_func = ss->timeout_func;
-		s->s->timeout_data = ss->timeout_data;
-		if (s->s->update_timeout)
-			s->s->update_timeout(s->s);
-	}
-}
-
-static int
-bs2_isalive(const stream *ss)
-{
-	struct bs2 *s;
-
-	if ((s = ss->stream_data.p) != NULL && s->s) {
-		if (s->s->isalive)
-			return s->s->isalive(s->s);
-		return 1;
-	}
-	return 0;
-}
-
-stream *
-block_stream2(stream *s, size_t bufsiz, compression_method comp)
-{
-	stream *ns;
-	stream *os = NULL;
-	bs2 *b;
-
-	if (s == NULL)
-		return NULL;
-	if (s->read == bs_read || s->write == bs_write) {
-		/* if passed in a block_stream instance, extract the
-		 * underlying stream */
-		os = s;
-		s = s->inner;
-	}
-
-#ifdef STREAM_DEBUG
-	fprintf(stderr, "block_stream2 %s\n", s->name ? s->name : "<unnamed>");
-#endif
-	if ((ns = create_wrapper_stream(NULL, s)) == NULL)
-		return NULL;
-	if ((b = bs2_create(s, bufsiz, comp)) == NULL) {
-		destroy_stream(ns);
-		mnstr_set_open_error(s->name, 0, "bs2_create failed");
-		return NULL;
-	}
-	/* blocksizes have a fixed little endian byteorder */
-#ifdef WORDS_BIGENDIAN
-	s->swapbytes = true;
-#endif
-	ns->binary = s->binary;
-	ns->readonly = s->readonly;
-	ns->close = bs2_close;
-	ns->clrerr = bs_clrerr;
-	ns->destroy = bs2_destroy;
-	ns->flush = bs2_flush;
-	ns->read = bs2_read;
-	ns->write = bs2_write;
-	ns->update_timeout = bs2_update_timeout;
-	ns->isalive = bs2_isalive;
-	ns->stream_data.p = (void *) b;
-
-	if (os != NULL) {
-		/* we extracted the underlying stream, destroy the old
-		 * shell */
-		os->inner = NULL;
-		bs_destroy(os);
-	}
-
-	return ns;
-}
rename from clients/mapilib/mapi_prompt.h
rename to common/stream/mapi_prompt.h
new file mode 100644
--- /dev/null
+++ b/common/stream/mapi_stream.c
@@ -0,0 +1,154 @@
+/*
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0.  If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright 1997 - July 2008 CWI, August 2008 - 2022 MonetDB B.V.
+ */
+
+#include "monetdb_config.h"
+#include "stream.h"
+#include "stream_internal.h"
+#include "mapi_prompt.h"
+
+
+static void
+discard(stream *s)
+{
+	static char bitbucket[8192];
+	while (1) {
+		ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket));
+		if (nread <= 0)
+			return;
+		assert(1);
+	}
+}
+
+struct mapi_recv_upload {
+	stream *from_client; // set to NULL after sending MAPI_PROMPT3
+	stream *to_client; // set to NULL when client sends empty
+};
+
+static ssize_t
+recv_upload_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
+{
+	struct mapi_recv_upload *state = s->stream_data.p;
+
+	if (state->from_client == NULL) {
+		assert(s->eof);
+		return 0;
+	}
+
+	ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt);
+	if (nread != 0 || state->from_client->eof)
+		return nread;
+
+	// before returning the 0 we send the prompt and make another attempt.
+	if (
+			mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2), 1) != 1
+		||	mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0
+	) {
+		mnstr_set_error(s, mnstr_errnr(state->to_client), "%s", mnstr_peek_error(state->to_client));
+		return -1;
+	}
+
+	// if it succeeds, return that to the client.
+	// if it's still a block boundary, return that to the client.
+	// if there's an error, return that to the client.
+	nread = mnstr_read(state->from_client, buf, elmsize, cnt);
+	if (nread > 0)
+		return nread;
+	if (nread == 0) {
+		s->eof = true;
+		state->from_client = NULL;
+		return nread;
+	} else {
+		mnstr_set_error(s, mnstr_errnr(state->from_client), "%s", mnstr_peek_error(state->from_client));
+		return -1;
+	}
+}
+
+static void
+recv_upload_close(stream *s)
+{
+	struct mapi_recv_upload *state = s->stream_data.p;
+
+	stream *from = state->from_client;
+	if (from)
+		discard(from);
+
+	stream *to = state->to_client;
+	mnstr_write(to, PROMPT3, strlen(PROMPT3), 1);
+	mnstr_flush(to, MNSTR_FLUSH_ALL);
+}
+
+static void
+recv_upload_destroy(stream *s)
+{
+	struct mapi_recv_upload *state = s->stream_data.p;
+	free(state);
+	free(s);
+}
+
+
+stream*
+mapi_request_upload(const char *filename, bool binary, bstream *bs, stream *ws)
+{
+	const char *msg = NULL;
+	stream *s = NULL;
+	struct mapi_recv_upload *state = NULL;
+	ssize_t nwritten;
+
+	while (!bs->eof)
+		bstream_next(bs);
+	stream *rs = bs->s;
+	assert(isa_block_stream(ws));
+	assert(isa_block_stream(rs));
+
+	if (binary)
+		nwritten = mnstr_printf(ws, "%srb %s\n", PROMPT3, filename);
+	else
+		nwritten = mnstr_printf(ws, "%sr 0 %s\n", PROMPT3, filename);
+	if (nwritten <= 0) {
+		msg = mnstr_peek_error(ws);
+		goto end;
+	}
+	if (mnstr_flush(ws, MNSTR_FLUSH_ALL) < 0) {
+		msg = mnstr_peek_error(ws);
+		goto end;
+	}
+
+	char buf[256];
+	if (mnstr_readline(rs, buf, sizeof(buf)) != 1 || buf[0] != '\n') {
+		msg = buf;
+		discard(rs);
+		goto end;
+	}
+
+	// Client accepted the request
+	state = malloc(sizeof(*state));
+	if (!state) {
+		msg = "malloc failed";
+		goto end;
+	}
+	s = create_stream("ONCLIENT");
+	if (!s) {
+		msg = mnstr_peek_error(NULL);
+		goto end;
+	}
+	state->from_client = rs;
+	state->to_client = ws;
+	s->stream_data.p = state;
+	s->binary= binary;
+	s->read = recv_upload_read;
+	s->close = recv_upload_close;
+	s->destroy = recv_upload_destroy;
+end:
+	if (msg) {
+		mnstr_destroy(s);
+		mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg);
+		return NULL;
+	} else {
+		return s;
+	}
+}
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -954,9 +954,7 @@ isa_block_stream(const stream *s)
 	assert(s != NULL);
 	return s &&
 		((s->read == bs_read ||
-		  s->write == bs_write) ||
-		 (s->read == bs2_read ||
-		  s->write == bs2_write));
+		  s->write == bs_write));
 }
 
 
--- a/common/stream/stream.h
+++ b/common/stream/stream.h
@@ -219,8 +219,6 @@ stream_export buffer *mnstr_get_buffer(s
 stream_export stream *block_stream(stream *s); // mapi.c, mal_mapi.c, client.c, merovingian
 stream_export bool isa_block_stream(const stream *s); // mapi.c, mal_client.c, remote.c, sql_scenario.c/sqlReader, sql_scan.c
 stream_export stream *bs_stream(stream *s); // unused
-stream_export void set_prompting(stream *block_stream, const char *prompt, stream *prompt_stream);
-
 
 typedef enum {
 	PROTOCOL_AUTO = 0, // unused
@@ -228,18 +226,6 @@ typedef enum {
 	PROTOCOL_COLUMNAR = 3 // sql_result.c
 } protocol_version;
 
-typedef enum {
-	COMPRESSION_NONE = 0, // mal_mapi.c
-	COMPRESSION_SNAPPY = 1, // mcrypt.c, mal_mapi.c
-	COMPRESSION_LZ4 = 2, // same
-	COMPRESSION_AUTO = 255 // never used
-} compression_method;
-
-stream_export stream *block_stream2(stream *s, size_t bufsiz, compression_method comp); // mal_mapi.c
-stream_export int bs2_resizebuf(stream *ss, size_t bufsiz); // sql_result.c
-stream_export buffer bs2_buffer(stream *s); // sql_result.c
-stream_export void bs2_setpos(stream *ss, size_t pos); // sql_result.c
-
 
 /* read block of data including the end of block marker */
 stream_export ssize_t mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt);
@@ -276,7 +262,8 @@ stream_export stream *stream_blackhole_c
 
 stream_export stream *stream_fwf_create(stream *restrict s, size_t num_fields, size_t *restrict widths, char filler); // sql.c
 
-
 stream_export stream *create_text_stream(stream *s);
 
+stream_export stream *mapi_request_upload(const char *filename, bool binary, bstream *rs, stream *ws);
+
 #endif /*_STREAM_H_*/
--- a/common/stream/stream_internal.h
+++ b/common/stream/stream_internal.h
@@ -271,8 +271,6 @@ struct bs {
 	unsigned itotal;	/* amount available in current read block */
 	size_t blks;		/* read/writen blocks (possibly partial) */
 	size_t bytes;		/* read/writen bytes */
-	const char *prompt;	/* on eof, first try to send this then try again */
-	stream *pstream;	/* stream to send prompts on */
 	char buf[BLOCK];	/* the buffered data (minus the size of
 				 * size-short */
 };
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -3101,7 +3101,6 @@ mvc_import_table_wrap(Client cntxt, MalB
 {
 	backend *be;
 	BAT **b = NULL;
-	ssize_t len = 0;
 	sql_table *t = *(sql_table **) getArgReference(stk, pci, pci->retc + 0);
 	const char *tsep = *getArgReference_str(stk, pci, pci->retc + 1);
 	const char *rsep = *getArgReference_str(stk, pci, pci->retc + 2);
@@ -3137,41 +3136,14 @@ mvc_import_table_wrap(Client cntxt, MalB
 		msg = mvc_import_table(cntxt, &b, be->mvc, be->mvc->scanner.rs, t, tsep, rsep, ssep, ns, sz, offset, besteffort, true, escape);
 	} else {
 		if (onclient) {
-			mnstr_write(be->mvc->scanner.ws, PROMPT3, sizeof(PROMPT3)-1, 1);
-			if (offset > 1 && rsep && rsep[0] == '\n' && rsep[1] == '\0') {
-				/* only let client skip simple lines */
-				mnstr_printf(be->mvc->scanner.ws, "r " LLFMT " %s\n",
-					     offset, fname);
-				offset = 0;
-			} else {
-				mnstr_printf(be->mvc->scanner.ws, "r 0 %s\n", fname);
-			}
-			msg = MAL_SUCCEED;
-			mnstr_flush(be->mvc->scanner.ws, MNSTR_FLUSH_DATA);
-			while (!be->mvc->scanner.rs->eof)
-				bstream_next(be->mvc->scanner.rs);
-			ss = be->mvc->scanner.rs->s;
-			char buf[80];
-			if ((len = mnstr_readline(ss, buf, sizeof(buf))) > 1) {
-				if (buf[0] == '!' && buf[6] == '!')
-					msg = createException(IO, "sql.copy_from", "%.7s%s: %s", buf, fname, buf+7);
-				else
-					msg = createException(IO, "sql.copy_from", "%s: %s", fname, buf);
-				while (buf[len - 1] != '\n' &&
-				       (len = mnstr_readline(ss, buf, sizeof(buf))) > 0)
-					;
-				/* read until flush marker */
-				while (mnstr_read(ss, buf, 1, sizeof(buf)) > 0)
-					;
-				return msg;
-			}
+			ss = mapi_request_upload(fname, false, be->mvc->scanner.rs, be->mvc->scanner.ws);
 		} else {
 			ss = open_rastream(fname);
-			if (ss == NULL || mnstr_errnr(ss)) {
-				msg = createException(IO, "sql.copy_from", SQLSTATE(42000) "%s", mnstr_peek_error(NULL));
-				close_stream(ss);
-				return msg;
-			}
+		}
+		if (ss == NULL || mnstr_errnr(ss)) {
+			msg = createException(IO, "sql.copy_from", SQLSTATE(42000) "%s", mnstr_peek_error(NULL));
+			close_stream(ss);
+			return msg;
 		}
 
 		if (!strNil(fixed_widths)) {
@@ -3220,12 +3192,7 @@ mvc_import_table_wrap(Client cntxt, MalB
 			throw(MAL, "sql.copy_from", SQLSTATE(HY013) MAL_MALLOC_FAIL);
 		}
 		msg = mvc_import_table(cntxt, &b, be->mvc, s, t, tsep, rsep, ssep, ns, sz, offset, besteffort, false, escape);
-		if (onclient) {
-			mnstr_write(be->mvc->scanner.ws, PROMPT3, sizeof(PROMPT3)-1, 1);
-			mnstr_flush(be->mvc->scanner.ws, MNSTR_FLUSH_DATA);
-			be->mvc->scanner.rs->eof = s->eof;
-			s->s = NULL;
-		}
+		// This also closes ss:
 		bstream_destroy(s);
 	}
 	if (b && !msg)
--- a/sql/backends/monet5/sql_bincopyfrom.c
+++ b/sql/backends/monet5/sql_bincopyfrom.c
@@ -543,70 +543,6 @@ load_column(struct type_rec *rec, const 
 		return msg;
 }
 
-
-static str
-start_mapi_file_upload(backend *be, str path, stream **s)
-{
-	str msg = MAL_SUCCEED;
-	*s = NULL;
-
-	stream *ws = be->mvc->scanner.ws;
-	bstream *bs = be->mvc->scanner.rs;
-	stream *rs = bs->s;
-	assert(isa_block_stream(ws));
-	assert(isa_block_stream(rs));
-
-	mnstr_write(ws, PROMPT3, sizeof(PROMPT3)-1, 1);
-	mnstr_printf(ws, "rb %s\n", path);
-	mnstr_flush(ws, MNSTR_FLUSH_DATA);
-	while (!bs->eof)
-		bstream_next(bs);
-	char buf[80];
-	if (mnstr_readline(rs, buf, sizeof(buf)) > 1) {
-		msg = createException(IO, "sql.importColumn", "Error %s", buf);
-		goto end;
-	}
-	set_prompting(rs, PROMPT2, ws);
-
-	*s = rs;
-end:
-	return msg;
-}
-
-
-static str
-finish_mapi_file_upload(backend *be, bool eof_reached)
-{
-	str msg = MAL_SUCCEED;
-	stream *ws = be->mvc->scanner.ws;
-	bstream *bs = be->mvc->scanner.rs;
-	stream *rs = bs->s;
-	assert(isa_block_stream(ws));
-	assert(isa_block_stream(rs));
-
-	set_prompting(rs, NULL, NULL);
-	if (!eof_reached) {
-		// Probably due to an error. Read until message boundary.
-		char buf[8190];
-		while (1) {
-			ssize_t nread = mnstr_read(rs, buf, 1, sizeof(buf));
-			if (nread > 0)
-				continue;
-			if (nread < 0)
-				msg = createException(
-					IO, "sql.importColumn",
-					"while syncing read stream: %s", mnstr_peek_error(rs));
-			break;
-		}
-	}
-	mnstr_write(ws, PROMPT3, sizeof(PROMPT3)-1, 1);
-	mnstr_flush(ws, MNSTR_FLUSH_DATA);
-
-	return msg;
-}
-
-
-
 /* Import a single file into a new BAT.
  */
 static str
@@ -620,7 +556,6 @@ importColumn(backend *be, bat *ret, BUN 
 	int gdk_type;
 	BAT *bat = NULL;
 	stream *stream_to_close = NULL;
-	bool do_finish_mapi = false;
 	int eof_reached = -1; // 1 = read to the end; 0 = stopped reading early; -1 = unset, a bug.
 
 	// This one is not managed by the end: block
@@ -645,15 +580,13 @@ importColumn(backend *be, bat *ret, BUN 
 
 	// Open the input stream
 	if (onclient) {
-		s = NULL;
-		do_finish_mapi = true;
-		msg = start_mapi_file_upload(be, path, &s);
-		if (msg != MAL_SUCCEED)
-			goto end;
+		s = stream_to_close = mapi_request_upload(path, true, be->mvc->scanner.rs, be->mvc->scanner.ws);
 	} else {
 		s = stream_to_close = open_rstream(path);
-		if (s == NULL)
-			bailout("Couldn't open '%s' on server: %s", path, mnstr_peek_error(NULL));
+	}
+	if (!s) {
+		msg = mnstr_error(NULL);
+		goto end;
 	}
 
 	// Do the work
@@ -667,12 +600,6 @@ importColumn(backend *be, bat *ret, BUN 
 
 	// Fall through into the end block which will clean things up
 end:
-	if (do_finish_mapi) {
-		str msg1 = finish_mapi_file_upload(be, eof_reached == 1);
-		if (msg == MAL_SUCCEED)
-			msg = msg1;
-	}
-
 	if (stream_to_close)
 		close_stream(stream_to_close);