LCOV - code coverage report
Current view: top level - common/stream - stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 273 352 77.6 %
Date: 2024-04-25 20:03:45 Functions: 40 54 74.1 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /* stream
      14             :  * ======
      15             :  * Niels Nes
      16             :  * An simple interface to streams
      17             :  *
      18             :  * Processing files, streams, and sockets is quite different on Linux
      19             :  * and Windows platforms. To improve portability between both, we advise
      20             :  * to replace the stdio actions with the stream functionality provided
      21             :  * here.
      22             :  *
      23             :  * This interface can also be used to open 'non compressed, gzipped,
      24             :  * bz2zipped' data files and sockets. Using this interface one could
      25             :  * easily switch between the various underlying storage types.
      26             :  *
      27             :  * buffered streams
      28             :  * ----------------
      29             :  *
      30             :  * The bstream (or buffered_stream) can be used for efficient reading of
      31             :  * a stream. Reading can be done in large chunks and access can be done
      32             :  * in smaller bits, by directly accessing the underlying buffer.
      33             :  *
      34             :  * Beware that a flush on a buffered stream emits an empty block to
      35             :  * synchronize with the other side, telling it has reached the end of
      36             :  * the sequence and can close its descriptors.
      37             :  *
      38             :  * bstream functions
      39             :  * -----------------
      40             :  *
      41             :  * The bstream_create gets a read stream (rs) as input and the initial
      42             :  * chunk size and creates a buffered stream from this. A spare byte is
      43             :  * kept at the end of the buffer.  The bstream_read will at least read
      44             :  * the next 'size' bytes. If the not read data (aka pos < len) together
      45             :  * with the new data will not fit in the current buffer it is resized.
      46             :  * The spare byte is kept.
      47             :  *
      48             :  * tee streams
      49             :  * -----------
      50             :  *
      51             :  * A tee stream is a write stream that duplicates all output to two
      52             :  * write streams of the same type (txt/bin).
      53             :  */
      54             : 
      55             : /* Generic stream handling code such as init and close */
      56             : 
      57             : #include "monetdb_config.h"
      58             : #include "stream.h"
      59             : #include "stream_internal.h"
      60             : #include <stdio.h>
      61             : 
      62             : 
      63             : #ifdef HAVE_PTHREAD_H
      64             : #include <pthread.h>
      65             : #endif
      66             : 
      67             : struct tl_error_buf {
      68             :         char msg[1024];
      69             : };
      70             : 
      71             : static int tl_error_init(void);
      72             : static struct tl_error_buf *get_tl_error_buf(void);
      73             : 
      74             : #ifdef HAVE_PTHREAD_H
      75             : 
      76             : static pthread_key_t tl_error_key;
      77             : 
      78             : static void
      79         528 : clear_main_tl_error_buf(void)
      80             : {
      81         528 :         void *p = pthread_getspecific(tl_error_key);
      82         528 :         if (p != NULL) {
      83         528 :                 pthread_setspecific(tl_error_key, NULL);
      84         528 :                 free(p);
      85             :         }
      86         528 : }
      87             : 
      88             : static int
      89         528 : tl_error_init(void)
      90             : {
      91         528 :         if (pthread_key_create(&tl_error_key, free) != 0)
      92             :                 return -1;
      93             :         // Turns out the destructor registered with pthread_key_create() does not
      94             :         // always run for the main thread. This atexit hook clears the main thread's
      95             :         // error buffer to avoid this being reported as a memory leak.
      96         528 :         atexit(clear_main_tl_error_buf);
      97         528 :         return 0;
      98             : }
      99             : 
     100             : static struct tl_error_buf*
     101      387320 : get_tl_error_buf(void)
     102             : {
     103      387320 :         struct tl_error_buf *p = pthread_getspecific(tl_error_key);
     104      387319 :         if (p == NULL) {
     105        3966 :                 p = malloc(sizeof(*p));
     106        3966 :                 if (p == NULL)
     107             :                         return NULL;
     108        3966 :                 *p = (struct tl_error_buf) { .msg = {0} };
     109        3966 :                 pthread_setspecific(tl_error_key, p);
     110        3966 :                 struct tl_error_buf *second_attempt = pthread_getspecific(tl_error_key);
     111        3966 :                 assert(p == second_attempt && "maybe mnstr_init has not been called?");
     112             :                 (void) second_attempt; // suppress warning if asserts disabled
     113             :         }
     114             :         return p;
     115             : }
     116             : 
     117             : #elif defined(WIN32)
     118             : 
     119             : static DWORD tl_error_key = 0;
     120             : 
     121             : static int
     122             : tl_error_init(void)
     123             : {
     124             :         DWORD key = TlsAlloc();
     125             :         if (key == TLS_OUT_OF_INDEXES)
     126             :                 return -1;
     127             :         else {
     128             :                 tl_error_key = key;
     129             :                 return 0;
     130             :         }
     131             : }
     132             : 
     133             : static struct tl_error_buf*
     134             : get_tl_error_buf(void)
     135             : {
     136             :         struct tl_error_buf *p = TlsGetValue(tl_error_key);
     137             : 
     138             :         if (p == NULL) {
     139             :                 if (GetLastError() != ERROR_SUCCESS)
     140             :                         return NULL; // something went terribly wrong
     141             : 
     142             :                 // otherwise, initialize
     143             :                 p = malloc(sizeof(*p));
     144             :                 if (p == NULL)
     145             :                         return NULL;
     146             :                 *p = (struct tl_error_buf) { .msg = 0 };
     147             :                 if (!TlsSetValue(tl_error_key, p)) {
     148             :                         free(p);
     149             :                         return NULL;
     150             :                 }
     151             : 
     152             :                 struct tl_error_buf *second_attempt = TlsGetValue(tl_error_key);
     153             :                 assert(p == second_attempt /* maybe mnstr_init has not been called? */);
     154             :                 (void) second_attempt; // suppress warning if asserts disabled
     155             :         }
     156             :         return p;
     157             : }
     158             : 
     159             : #else
     160             : 
     161             : #error "no pthreads and no Windows, don't know what to do"
     162             : 
     163             : #endif
     164             : 
     165             : static const char *mnstr_error_kind_description(mnstr_error_kind kind);
     166             : 
     167             : int
     168         720 : mnstr_init(void)
     169             : {
     170         720 :         static ATOMIC_FLAG inited = ATOMIC_FLAG_INIT;
     171             : 
     172         720 :         if (ATOMIC_TAS(&inited))
     173             :                 return 0;
     174             : 
     175         528 :         if (tl_error_init()< 0)
     176             :                 return -1;
     177             : 
     178             : #ifdef NATIVE_WIN32
     179             :         WSADATA w;
     180             :         if (WSAStartup(0x0101, &w) != 0)
     181             :                 return -1;
     182             : #endif
     183             : 
     184             :         return 0;
     185             : }
     186             : 
     187             : const char *
     188           0 : mnstr_version(void)
     189             : {
     190           0 :         return STREAM_VERSION;
     191             : }
     192             : 
     193             : /* Read at most cnt elements of size elmsize from the stream.  Returns
     194             :  * the number of elements actually read or < 0 on failure. */
     195             : ssize_t
     196    17184422 : mnstr_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
     197             : {
     198    17184422 :         if (s == NULL || buf == NULL)
     199             :                 return -1;
     200             : #ifdef STREAM_DEBUG
     201             :         fprintf(stderr, "read %s %zu %zu\n",
     202             :                 s->name ? s->name : "<unnamed>", elmsize, cnt);
     203             : #endif
     204    17184422 :         assert(s->readonly);
     205    17184422 :         if (s->errkind != MNSTR_NO__ERROR)
     206             :                 return -1;
     207    17184422 :         return s->read(s, buf, elmsize, cnt);
     208             : }
     209             : 
     210             : 
     211             : /* Write cnt elements of size elmsize to the stream.  Returns the
     212             :  * number of elements actually written.  If elmsize or cnt equals zero,
     213             :  * returns cnt. */
     214             : ssize_t
     215    33459996 : mnstr_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     216             : {
     217    33459996 :         if (s == NULL || buf == NULL)
     218             :                 return -1;
     219             : #ifdef STREAM_DEBUG
     220             :         fprintf(stderr, "write %s %zu %zu\n",
     221             :                 s->name ? s->name : "<unnamed>", elmsize, cnt);
     222             : #endif
     223    33459994 :         assert(!s->readonly);
     224    33459994 :         if (s->errkind != MNSTR_NO__ERROR)
     225             :                 return -1;
     226    33459984 :         return s->write(s, buf, elmsize, cnt);
     227             : }
     228             : 
     229             : 
     230             : /* Read one line (seperated by \n) of at most maxcnt-1 characters from
     231             :  * the stream.  Returns the number of characters actually read,
     232             :  * includes the trailing \n; terminated by a NULL byte. */
     233             : ssize_t
     234      108847 : mnstr_readline(stream *restrict s, void *restrict buf, size_t maxcnt)
     235             : {
     236      108847 :         char *b = buf, *start = buf;
     237             : 
     238      108847 :         if (s == NULL || buf == NULL)
     239             :                 return -1;
     240             : #ifdef STREAM_DEBUG
     241             :         fprintf(stderr, "readline %s %zu\n",
     242             :                 s->name ? s->name : "<unnamed>", maxcnt);
     243             : #endif
     244      108847 :         assert(s->readonly);
     245      108847 :         if (s->errkind != MNSTR_NO__ERROR)
     246             :                 return -1;
     247      108847 :         if (maxcnt == 0)
     248             :                 return 0;
     249      108847 :         if (maxcnt == 1) {
     250           0 :                 *start = 0;
     251           0 :                 return 0;
     252             :         }
     253    18247315 :         for (;;) {
     254    18247315 :                 switch (s->read(s, start, 1, 1)) {
     255    18247055 :                 case 1:
     256             :                         /* successfully read a character,
     257             :                          * check whether it is the line
     258             :                          * separator and whether we have space
     259             :                          * left for more */
     260    18247055 :                         if (*start++ == '\n' || --maxcnt == 1) {
     261      108587 :                                 *start = 0;
     262      108587 :                                 return (ssize_t) (start - b);
     263             :                         }
     264             :                         break;
     265           0 :                 case -1:
     266             :                         /* error: if we didn't read anything yet,
     267             :                          * return the error, otherwise return what we
     268             :                          * have */
     269           0 :                         if (start == b)
     270             :                                 return -1;
     271             :                         /* fall through */
     272             :                 case 0:
     273             :                         /* end of file: return what we have */
     274         260 :                         *start = 0;
     275         260 :                         return (ssize_t) (start - b);
     276             :                 }
     277             :         }
     278             : }
     279             : 
     280             : 
     281             : void
     282       38206 : mnstr_settimeout(stream *s, unsigned int ms, bool (*func)(void *), void *data)
     283             : {
     284       38206 :         if (s) {
     285       38206 :                 s->timeout = ms;
     286       38206 :                 s->timeout_func = func;
     287       38206 :                 s->timeout_data = data;
     288       38206 :                 if (s->update_timeout)
     289       38206 :                         s->update_timeout(s);
     290             :         }
     291       38205 : }
     292             : 
     293             : 
     294             : void
     295       79968 : mnstr_close(stream *s)
     296             : {
     297       79968 :         if (s) {
     298             : #ifdef STREAM_DEBUG
     299             :                 fprintf(stderr, "close %s\n", s->name ? s->name : "<unnamed>");
     300             : #endif
     301       79968 :                 s->close(s);
     302             :         }
     303       79968 : }
     304             : 
     305             : 
     306             : void
     307        1379 : mnstr_destroy(stream *s)
     308             : {
     309        1379 :         if (s) {
     310             : #ifdef STREAM_DEBUG
     311             :                 fprintf(stderr, "destroy %s\n",
     312             :                         s->name ? s->name : "<unnamed>");
     313             : #endif
     314        1373 :                 s->destroy(s);
     315             :         }
     316        1379 : }
     317             : 
     318             : void
     319         317 : mnstr_va_set_error(stream *s, mnstr_error_kind kind, const char *fmt, va_list ap)
     320             : {
     321         317 :         if (s == NULL)
     322             :                 return;
     323             : 
     324         317 :         s->errkind = kind;
     325             : 
     326         317 :         if (kind == MNSTR_NO__ERROR) {
     327           0 :                 s->errmsg[0] = '\0';
     328           0 :                 return;
     329             :         }
     330             : 
     331         317 :         char *start = &s->errmsg[0];
     332         317 :         char *end = start + sizeof(s->errmsg);
     333         317 :         if (s->name != NULL)
     334         317 :                 start += snprintf(start, end - start, "stream %s: ", s->name);
     335             : 
     336         317 :         if (start >= end - 1)
     337             :                 return;
     338             : 
     339         317 :         if (fmt == NULL)
     340          19 :                 fmt = mnstr_error_kind_description(kind);
     341             : 
     342             :         // Complicated pointer dance in order to shut up 'might be a candidate
     343             :         // for gnu_printf format attribute' warning from gcc.
     344             :         // It's really eager to trace where the vsnprintf ends up, we need
     345             :         // the ? : to throw it off its scent.
     346             :         // Similarly, the parentheses around the 1 serve to suppress a Clang
     347             :         // warning about dead code (the atoi).
     348         317 :         void *f1 = (1) ? (void*)&vsnprintf : (void*)&atoi;
     349         317 :         int (*f)(char *str, size_t size, const char *format, va_list ap) = f1;
     350         317 :         f(start, end - start, fmt, ap);
     351             : }
     352             : 
     353             : void
     354          20 : mnstr_set_error(stream *s, mnstr_error_kind kind, const char *fmt, ...)
     355             : {
     356          20 :         va_list ap;
     357          20 :         va_start(ap, fmt);
     358          20 :         mnstr_va_set_error(s, kind, fmt, ap);
     359          20 :         va_end(ap);
     360          20 : }
     361             : 
     362             : static size_t my_strerror_r(int error_nr, char *buf, size_t len);
     363             : 
     364             : void
     365         297 : mnstr_set_error_errno(stream *s, mnstr_error_kind kind, const char *fmt, ...)
     366             : {
     367         297 :         va_list ap;
     368         297 :         va_start(ap, fmt);
     369         297 :         mnstr_va_set_error(s, kind, fmt, ap);
     370         297 :         va_end(ap);
     371             : 
     372             :         /* append as much as fits of the system error message */
     373         297 :         char *start = &s->errmsg[0] + strlen(s->errmsg);
     374         297 :         char *end = &s->errmsg[0] + sizeof(s->errmsg);
     375         297 :         if (end - start >= 3) {
     376         297 :                 start = stpcpy(start, ": ");
     377         297 :                 start += my_strerror_r(errno, start, end - start);
     378             :         }
     379         297 : }
     380             : 
     381             : 
     382             : void
     383      387317 : mnstr_set_open_error(const char *name, int errnr, const char *fmt, ...)
     384             : {
     385      387317 :         va_list ap;
     386             : 
     387      387317 :         struct tl_error_buf *buf = get_tl_error_buf();
     388      387315 :         if (buf == NULL)
     389      387186 :                 return; // hopeless
     390             : 
     391      387315 :         if (errnr == 0 && fmt == NULL) {
     392      387186 :                 buf->msg[0] = '\0';
     393      387186 :                 return;
     394             :         }
     395             : 
     396         129 :         char *start = &buf->msg[0];
     397         129 :         char *end = start + sizeof(buf->msg);
     398             : 
     399         129 :         if (name != NULL)
     400         129 :                 start += snprintf(start, end - start, "when opening %s: ", name);
     401         129 :         if (start >= end - 1)
     402             :                 return;
     403             : 
     404         129 :         if (fmt != NULL) {
     405         129 :                 va_start(ap, fmt);
     406         129 :                 start += vsnprintf(start, end - start, fmt, ap);
     407         129 :                 va_end(ap);
     408             :         }
     409         129 :         if (start >= end - 1)
     410             :                 return;
     411             : 
     412         129 :         if (errnr != 0 && end - start >= 3) {
     413         127 :                 start = stpcpy(start, ": ");
     414         127 :                 start += my_strerror_r(errno, start, end - start);
     415             :         }
     416         129 :         if (start >= end - 1)
     417             :                 return;
     418             : }
     419             : 
     420             : static size_t
     421         424 : my_strerror_r(int error_nr, char *buf, size_t buflen)
     422             : {
     423             :         // Three cases:
     424             :         // 1. no strerror_r
     425             :         // 2. gnu strerror_r (returns char* and does not always fill buffer)
     426             :         // 3. xsi strerror_r (returns int and always fills the buffer)
     427         424 :         char *to_move;
     428             : #ifndef HAVE_STRERROR_R
     429             :         // Hope for the best
     430             :         to_move = strerror(error_nr);
     431             : #elif !defined(_GNU_SOURCE) || !_GNU_SOURCE
     432             :         // standard strerror_r always writes to buf
     433             :         int result_code = strerror_r(error_nr, buf, buflen);
     434             :         if (result_code == 0)
     435             :                 to_move = NULL;
     436             :         else
     437             :                 to_move = "<failed to retrieve error message>";
     438             : #else
     439             :         // gnu strerror_r sometimes only returns static string, needs copy
     440         424 :         to_move = strerror_r(error_nr, buf, buflen);
     441             : #endif
     442         424 :         if (to_move != NULL) {
     443             :                 // move to buffer
     444         424 :                 size_t size = strlen(to_move) + 1;
     445         424 :                 assert(size <= buflen);
     446             :                 // strerror_r may have return a pointer to/into the buffer
     447         424 :                 memmove(buf, to_move, size);
     448         424 :                 return size - 1;
     449             :         } else {
     450           0 :                 return strlen(buf);
     451             :         }
     452             : }
     453             : 
     454             : 
     455             : 
     456         315 : void mnstr_copy_error(stream *dst, stream *src)
     457             : {
     458         315 :         dst->errkind = src->errkind;
     459         315 :         memcpy(dst->errmsg, src->errmsg, sizeof(dst->errmsg));
     460         315 : }
     461             : 
     462             : char *
     463           0 : mnstr_error(const stream *s)
     464             : {
     465           0 :         const char *msg = mnstr_peek_error(s);
     466           0 :         if (msg != NULL)
     467           0 :                 return strdup(msg);
     468             :         else
     469             :                 return NULL;
     470             : }
     471             : 
     472             : const char*
     473           7 : mnstr_peek_error(const stream *s)
     474             : {
     475           7 :         if (s == NULL) {
     476           3 :                 struct tl_error_buf *b = get_tl_error_buf();
     477           3 :                 if (b != NULL)
     478           3 :                         return b->msg;
     479             :                 else
     480             :                         return "unknown error";
     481             :         }
     482             : 
     483           4 :         if (s->errkind == MNSTR_NO__ERROR)
     484             :                 return "no error";
     485             : 
     486           4 :         if (s->errmsg[0] != '\0')
     487           4 :                 return s->errmsg;
     488             : 
     489           0 :         return mnstr_error_kind_description(s->errkind);
     490             : }
     491             : 
     492             : static const char *
     493          19 : mnstr_error_kind_description(mnstr_error_kind kind)
     494             : {
     495          19 :         switch (kind) {
     496             :         case MNSTR_NO__ERROR:
     497             :                 /* unreachable */
     498           0 :                 assert(0);
     499             :                 return NULL;
     500             :         case MNSTR_OPEN_ERROR:
     501             :                 return "error could not open";
     502           1 :         case MNSTR_READ_ERROR:
     503           1 :                 return "error reading";
     504           0 :         case MNSTR_WRITE_ERROR:
     505           0 :                 return "error writing";
     506             :         case MNSTR_TIMEOUT:
     507             :                 return "timeout";
     508             :         case MNSTR_UNEXPECTED_EOF:
     509             :                 return "timeout";
     510             :         }
     511             : 
     512           0 :         return "Unknown error";
     513             : }
     514             : 
     515             : /* flush buffer, return 0 on success, non-zero on failure */
     516             : int
     517      730083 : mnstr_flush(stream *s, mnstr_flush_level flush_level)
     518             : {
     519      730083 :         if (s == NULL)
     520             :                 return -1;
     521             : #ifdef STREAM_DEBUG
     522             :         fprintf(stderr, "flush %s\n", s->name ? s->name : "<unnamed>");
     523             : #endif
     524      730082 :         assert(!s->readonly);
     525      730082 :         if (s->errkind != MNSTR_NO__ERROR)
     526             :                 return -1;
     527      730082 :         if (s->flush)
     528      728627 :                 return s->flush(s, flush_level);
     529             :         return 0;
     530             : }
     531             : 
     532             : 
     533             : /* sync file to disk, return 0 on success, non-zero on failure */
     534             : int
     535          36 : mnstr_fsync(stream *s)
     536             : {
     537          36 :         if (s == NULL)
     538             :                 return -1;
     539             : #ifdef STREAM_DEBUG
     540             :         fprintf(stderr, "fsync %s (%d)\n",
     541             :                 s->name ? s->name : "<unnamed>", s->errnr);
     542             : #endif
     543          36 :         assert(!s->readonly);
     544          36 :         if (s->errkind != MNSTR_NO__ERROR)
     545             :                 return -1;
     546          36 :         if (s->fsync)
     547          36 :                 return s->fsync(s);
     548             :         return 0;
     549             : }
     550             : 
     551             : 
     552             : int
     553           0 : mnstr_fgetpos(stream *restrict s, fpos_t *restrict p)
     554             : {
     555           0 :         if (s == NULL || p == NULL)
     556             :                 return -1;
     557             : #ifdef STREAM_DEBUG
     558             :         fprintf(stderr, "fgetpos %s\n", s->name ? s->name : "<unnamed>");
     559             : #endif
     560           0 :         if (s->errkind != MNSTR_NO__ERROR)
     561             :                 return -1;
     562           0 :         if (s->fgetpos)
     563           0 :                 return s->fgetpos(s, p);
     564             :         return 0;
     565             : }
     566             : 
     567             : 
     568             : int
     569           0 : mnstr_fsetpos(stream *restrict s, fpos_t *restrict p)
     570             : {
     571           0 :         if (s == NULL)
     572             :                 return -1;
     573             : #ifdef STREAM_DEBUG
     574             :         fprintf(stderr, "fsetpos %s\n", s->name ? s->name : "<unnamed>");
     575             : #endif
     576           0 :         if (s->errkind != MNSTR_NO__ERROR)
     577             :                 return -1;
     578           0 :         if (s->fsetpos)
     579           0 :                 return s->fsetpos(s, p);
     580             :         return 0;
     581             : }
     582             : 
     583             : 
     584             : int
     585     8616075 : mnstr_isalive(const stream *s)
     586             : {
     587     8616075 :         if (s == NULL)
     588             :                 return 0;
     589     8616075 :         if (s->errkind != MNSTR_NO__ERROR)
     590             :                 return -1;
     591     8616075 :         if (s->isalive)
     592     7771597 :                 return s->isalive(s);
     593             :         return 1;
     594             : }
     595             : 
     596             : 
     597             : bool
     598      189222 : mnstr_eof(const stream *s)
     599             : {
     600      189222 :         return s->eof;
     601             : }
     602             : 
     603             : const char *
     604       25402 : mnstr_name(const stream *s)
     605             : {
     606       25402 :         if (s == NULL)
     607             :                 return "connection terminated";
     608       25402 :         return s->name;
     609             : }
     610             : 
     611             : 
     612             : mnstr_error_kind
     613     4946526 : mnstr_errnr(const stream *s)
     614             : {
     615     4946526 :         if (s == NULL)
     616             :                 return MNSTR_READ_ERROR;
     617     4946526 :         return s->errkind;
     618             : }
     619             : 
     620             : const char *
     621           0 : mnstr_error_kind_name(mnstr_error_kind k)
     622             : {
     623           0 :         switch (k) {
     624             :         case MNSTR_NO__ERROR:
     625             :                 return "MNSTR_NO__ERROR";
     626           0 :         case MNSTR_OPEN_ERROR:
     627           0 :                 return "MNSTR_OPEN_ERROR";
     628           0 :         case MNSTR_READ_ERROR:
     629           0 :                 return "MNSTR_READ_ERROR";
     630           0 :         case MNSTR_WRITE_ERROR:
     631           0 :                 return "MNSTR_WRITE_ERROR";
     632           0 :         case MNSTR_TIMEOUT:
     633           0 :                 return "MNSTR_TIMEOUT";
     634           0 :         default:
     635           0 :                 return "<UNKNOWN_ERROR>";
     636             :         }
     637             : 
     638             : }
     639             : void
     640           2 : mnstr_clearerr(stream *s)
     641             : {
     642           2 :         if (s != NULL) {
     643           2 :                 s->errkind = MNSTR_NO__ERROR;
     644           2 :                 s->errmsg[0] = '\0';
     645           2 :                 if (s->clrerr)
     646           0 :                         s->clrerr(s);
     647             :         }
     648           2 : }
     649             : 
     650             : 
     651             : bool
     652         605 : mnstr_isbinary(const stream *s)
     653             : {
     654         605 :         if (s == NULL)
     655             :                 return false;
     656         605 :         return s->binary;
     657             : }
     658             : 
     659             : 
     660             : bool
     661           0 : mnstr_get_swapbytes(const stream *s)
     662             : {
     663           0 :         if (s == NULL)
     664             :                 return 0;
     665           0 :         return s->swapbytes;
     666             : }
     667             : 
     668             : 
     669             : /* set stream to big-endian/little-endian byte order; the default is
     670             :  * native byte order */
     671             : void
     672       39586 : mnstr_set_bigendian(stream *s, bool bigendian)
     673             : {
     674       39586 :         if (s == NULL)
     675             :                 return;
     676             : #ifdef STREAM_DEBUG
     677             :         fprintf(stderr, "mnstr_set_bigendian %s %s\n",
     678             :                 s->name ? s->name : "<unnamed>",
     679             :                 swapbytes ? "true" : "false");
     680             : #endif
     681       39586 :         assert(s->readonly);
     682       39586 :         s->binary = true;
     683             : #ifdef WORDS_BIGENDIAN
     684             :         s->swapbytes = !bigendian;
     685             : #else
     686       39586 :         s->swapbytes = bigendian;
     687             : #endif
     688             : }
     689             : 
     690             : 
     691             : void
     692       67710 : close_stream(stream *s)
     693             : {
     694       67710 :         if (s) {
     695       67707 :                 if (s->close)
     696       67707 :                         s->close(s);
     697       67707 :                 if (s->destroy)
     698       67707 :                         s->destroy(s);
     699             :         }
     700       67710 : }
     701             : 
     702             : 
     703             : void
     704      387108 : destroy_stream(stream *s)
     705             : {
     706      387108 :         if (s->name)
     707      387108 :                 free(s->name);
     708      387108 :         free(s);
     709      387108 : }
     710             : 
     711             : 
     712             : stream *
     713      387188 : create_stream(const char *name)
     714             : {
     715      387188 :         stream *s;
     716             : 
     717      387188 :         if (name == NULL) {
     718           0 :                 mnstr_set_open_error(NULL, 0, "internal error: name not set");
     719           0 :                 return NULL;
     720             :         }
     721      387188 :         if ((s = (stream *) malloc(sizeof(*s))) == NULL) {
     722           0 :                 mnstr_set_open_error(name, errno, "malloc failed");
     723           0 :                 return NULL;
     724             :         }
     725      387188 :         *s = (stream) {
     726             :                 .swapbytes = false,
     727             :                 .readonly = true,
     728             :                 .isutf8 = false,        /* not known for sure */
     729             :                 .binary = false,
     730             :                 .eof = false,
     731      387188 :                 .name = strdup(name),
     732             :                 .errkind = MNSTR_NO__ERROR,
     733             :                 .errmsg = {0},
     734             :                 .destroy = destroy_stream,
     735             :         };
     736      387188 :         if(s->name == NULL) {
     737           0 :                 free(s);
     738           0 :                 mnstr_set_open_error(name, errno, "malloc failed");
     739           0 :                 return NULL;
     740             :         }
     741             : #ifdef STREAM_DEBUG
     742             :         fprintf(stderr, "create_stream %s -> %p\n",
     743             :                 name ? name : "<unnamed>", s);
     744             : #endif
     745      387188 :         mnstr_set_open_error(NULL, 0, NULL); // clear the error
     746      387188 :         return s;
     747             : }
     748             : 
     749             : 
     750             : static ssize_t
     751           0 : wrapper_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
     752             : {
     753           0 :         ssize_t ret = s->inner->read(s->inner, buf, elmsize, cnt);
     754           0 :         s->eof |= s->inner->eof;
     755           0 :         return ret;
     756             : }
     757             : 
     758             : 
     759             : static ssize_t
     760           0 : wrapper_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
     761             : {
     762           0 :         return s->inner->write(s->inner, buf, elmsize, cnt);
     763             : }
     764             : 
     765             : 
     766             : static void
     767           0 : wrapper_close(stream *s)
     768             : {
     769           0 :         s->inner->close(s->inner);
     770           0 : }
     771             : 
     772             : 
     773             : static void
     774           0 : wrapper_clrerr(stream *s)
     775             : {
     776           0 :         s->inner->clrerr(s->inner);
     777           0 : }
     778             : 
     779             : 
     780             : static void
     781           0 : wrapper_destroy(stream *s)
     782             : {
     783           0 :         s->inner->destroy(s->inner);
     784           0 :         destroy_stream(s);
     785           0 : }
     786             : 
     787             : 
     788             : static int
     789           0 : wrapper_flush(stream *s, mnstr_flush_level flush_level)
     790             : {
     791           0 :         return s->inner->flush(s->inner, flush_level);
     792             : }
     793             : 
     794             : 
     795             : static int
     796           4 : wrapper_fsync(stream *s)
     797             : {
     798           4 :         return s->inner->fsync(s->inner);
     799             : }
     800             : 
     801             : 
     802             : static int
     803           0 : wrapper_fgetpos(stream *restrict s, fpos_t *restrict p)
     804             : {
     805           0 :         return s->inner->fgetpos(s->inner, p);
     806             : }
     807             : 
     808             : 
     809             : static int
     810           0 : wrapper_fsetpos(stream *restrict s, fpos_t *restrict p)
     811             : {
     812           0 :         return s->inner->fsetpos(s->inner, p);
     813             : }
     814             : 
     815             : 
     816             : static void
     817       38206 : wrapper_update_timeout(stream *s)
     818             : {
     819       38206 :         s->inner->timeout = s->timeout;
     820       38206 :         s->inner->timeout_func = s->timeout_func;
     821       38206 :         s->inner->timeout_data = s->timeout_data;
     822       38206 :         s->inner->update_timeout(s->inner);
     823       38205 : }
     824             : 
     825             : 
     826             : static int
     827     7771766 : wrapper_isalive(const stream *s)
     828             : {
     829     7771766 :         return s->inner->isalive(s->inner);
     830             : }
     831             : 
     832             : 
     833             : stream *
     834       80579 : create_wrapper_stream(const char *name, stream *inner)
     835             : {
     836       80579 :         if (inner == NULL)
     837             :                 return NULL;
     838       80579 :         if (name == NULL)
     839       80575 :                 name = inner->name;
     840       80579 :         stream *s = create_stream(name);
     841       80579 :         if (s == NULL)
     842             :                 return NULL;
     843             : 
     844             : 
     845       80579 :         s->swapbytes = inner->swapbytes;
     846       80579 :         s->readonly = inner->readonly;
     847       80579 :         s->isutf8 = inner->isutf8;
     848       80579 :         s->binary = inner->binary;
     849       80579 :         s->timeout = inner->timeout;
     850       80579 :         s->inner = inner;
     851             : 
     852       80579 :         s->read = inner->read == NULL ? NULL : wrapper_read;
     853       80579 :         s->write = inner->write == NULL ? NULL : wrapper_write;
     854       80579 :         s->close = inner->close == NULL ? NULL : wrapper_close;
     855       80579 :         s->clrerr = inner->clrerr == NULL ? NULL : wrapper_clrerr;
     856       80579 :         s->destroy = wrapper_destroy;
     857       80579 :         s->flush = inner->flush == NULL ? NULL : wrapper_flush;
     858       80579 :         s->fsync = inner->fsync == NULL ? NULL : wrapper_fsync;
     859       80579 :         s->fgetpos = inner->fgetpos == NULL ? NULL : wrapper_fgetpos;
     860       80579 :         s->fsetpos = inner->fsetpos == NULL ? NULL : wrapper_fsetpos;
     861       80579 :         s->isalive = inner->isalive == NULL ? NULL : wrapper_isalive;
     862       80579 :         s->update_timeout = inner->update_timeout == NULL ? NULL : wrapper_update_timeout;
     863             : 
     864       80579 :         return s;
     865             : }
     866             : 
     867             : /* ------------------------------------------------------------------ */
     868             : /* streams working on a disk file, compressed or not */
     869             : 
     870             : stream *
     871       13374 : open_rstream(const char *filename)
     872             : {
     873       13374 :         if (filename == NULL)
     874             :                 return NULL;
     875             : #ifdef STREAM_DEBUG
     876             :         fprintf(stderr, "open_rstream %s\n", filename);
     877             : #endif
     878             : 
     879       13374 :         stream *s = open_stream(filename, "rb");
     880       13374 :         if (s == NULL)
     881             :                 return NULL;
     882             : 
     883       13247 :         stream *c = compressed_stream(s, 0);
     884       13247 :         if (c == NULL)
     885           0 :                 close_stream(s);
     886             : 
     887             :         return c;
     888             : }
     889             : 
     890             : stream *
     891       12154 : open_wstream(const char *filename)
     892             : {
     893       12154 :         if (filename == NULL)
     894             :                 return NULL;
     895             : #ifdef STREAM_DEBUG
     896             :         fprintf(stderr, "open_wstream %s\n", filename);
     897             : #endif
     898             : 
     899       12154 :         stream *s = open_stream(filename, "wb");
     900       12154 :         if (s == NULL)
     901             :                 return NULL;
     902             : 
     903       12154 :         stream *c = compressed_stream(s, 0);
     904       12154 :         if (c == NULL) {
     905           0 :                 close_stream(s);
     906           0 :                 (void) file_remove(filename);
     907             :         }
     908             : 
     909             :         return c;
     910             : }
     911             : 
     912             : stream *
     913         312 : open_rastream(const char *filename)
     914             : {
     915         312 :         if (filename == NULL)
     916             :                 return NULL;
     917             : #ifdef STREAM_DEBUG
     918             :         fprintf(stderr, "open_rastream %s\n", filename);
     919             : #endif
     920         312 :         stream *s = open_rstream(filename);
     921         312 :         if (s == NULL)
     922             :                 return NULL;
     923             : 
     924         297 :         stream *t = create_text_stream(s);
     925         297 :         if (t == NULL)
     926           0 :                 close_stream(s);
     927             : 
     928             :         return t;
     929             : }
     930             : 
     931             : stream *
     932           5 : open_wastream(const char *filename)
     933             : {
     934           5 :         if (filename == NULL)
     935             :                 return NULL;
     936             : #ifdef STREAM_DEBUG
     937             :         fprintf(stderr, "open_wastream %s\n", filename);
     938             : #endif
     939           5 :         stream *s = open_wstream(filename);
     940           5 :         if (s == NULL)
     941             :                 return NULL;
     942             : 
     943           5 :         stream *t = create_text_stream(s);
     944           5 :         if (t == NULL) {
     945           0 :                 close_stream(s);
     946           0 :                 (void) file_remove(filename);
     947             :         }
     948             : 
     949             :         return t;
     950             : }
     951             : 
     952             : 
     953             : /* put here because it depends on both bs_read AND bs2_read */
     954             : bool
     955      465942 : isa_block_stream(const stream *s)
     956             : {
     957      465942 :         assert(s != NULL);
     958      931884 :         return s &&
     959      465942 :                 ((s->read == bs_read ||
     960        3300 :                   s->write == bs_write));
     961             : }
     962             : 
     963             : 
     964             : /* Put here because I need to think very carefully about this
     965             :  * mnstr_read(,, 0, 0). What would that mean?
     966             :  */
     967             : ssize_t
     968       39759 : mnstr_read_block(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
     969             : {
     970       39759 :         ssize_t len = 0;
     971       39759 :         char x = 0;
     972             : 
     973       39759 :         if (s == NULL || buf == NULL)
     974             :                 return -1;
     975       39759 :         assert(s->read == bs_read || s->write == bs_write);
     976       79518 :         if ((len = mnstr_read(s, buf, elmsize, cnt)) < 0 ||
     977       39759 :             mnstr_read(s, &x, 0, 0) < 0 /* read prompt */  ||
     978       39759 :             x > 0)
     979           1 :                 return -1;
     980             :         return len;
     981             : }

Generated by: LCOV version 1.14