Mercurial > hg > MonetDB
changeset 86170:254dcf085ee5 Jul2021
Refactor BBP.dir reading code. Use a function to read a single line.
This function is now also used for the TAILCHKMASK check just before the
actual commit.
Manual backport of changeset 11d1a723b150.
| author | Sjoerd Mullender <sjoerd@acm.org> |
|---|---|
| date | Wed, 27 Jul 2022 16:43:31 +0200 |
| parents | 748a688a1700 |
| children | 73247d49ccd2 |
| files | gdk/gdk_bbp.c |
| diffstat | 1 files changed, 339 insertions(+), 273 deletions(-) [+] |
line wrap: on
line diff
--- a/gdk/gdk_bbp.c +++ b/gdk/gdk_bbp.c @@ -428,7 +428,7 @@ static gdk_return BBPrecover_subdir(void static bool BBPdiskscan(const char *, size_t); static int -heapinit(BAT *b, const char *buf, int *hashash, unsigned bbpversion, bat bid, const char *filename, int lineno) +heapinit(BAT *b, const char *buf, int *hashash, unsigned bbpversion, const char *filename, int lineno) { int t; char type[33]; @@ -488,7 +488,7 @@ heapinit(BAT *b, const char *buf, int *h return -1; } } else if (var != (t == TYPE_void || BATatoms[t].atomPut != NULL)) { - TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tvarsized mismatch for BAT %d on line %d\n", (int) bid, lineno); + TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tvarsized mismatch for BAT %d on line %d\n", (int) b->batCacheid, lineno); return -1; } else if (var && t != 0 ? ATOMsize(t) < width || @@ -498,7 +498,7 @@ heapinit(BAT *b, const char *buf, int *h #endif ) : ATOMsize(t) != width) { - TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tsize mismatch for BAT %d on line %d\n", (int) bid, lineno); + TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tsize mismatch for BAT %d on line %d\n", (int) b->batCacheid, lineno); return -1; } b->ttype = t; @@ -541,7 +541,7 @@ heapinit(BAT *b, const char *buf, int *h } static int -vheapinit(BAT *b, const char *buf, int hashash, bat bid, const char *filename, int lineno) +vheapinit(BAT *b, const char *buf, int hashash, const char *filename, int lineno) { int n = 0; uint64_t free, size; @@ -555,11 +555,6 @@ vheapinit(BAT *b, const char *buf, int h TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno); return -1; } - b->tvheap = GDKmalloc(sizeof(Heap)); - if (b->tvheap == NULL) { - TRC_CRITICAL(GDK, "cannot allocate memory for heap."); - return -1; - } if (b->ttype >= 0 && ATOMstorage(b->ttype) == TYPE_str && free < GDK_STRHASHTABLE * sizeof(stridx_t) + BATTINY * GDK_VARALIGN) @@ -577,108 +572,216 @@ vheapinit(BAT *b, const char *buf, int h .cleanhash = true, .newstorage = STORE_INVALID, .dirty = false, - .parentid = bid, + .parentid = b->batCacheid, .farmid = BBPselectfarm(PERSISTENT, b->ttype, varheap), }; strconcat_len(b->tvheap->filename, sizeof(b->tvheap->filename), filename, ".theap", NULL); - ATOMIC_INIT(&b->tvheap->refs, 1); + } else { + b->tvheap = NULL; } return n; } +/* read a single line from the BBP.dir file (file pointer fp) and fill + * in the structure pointed to by bn and extra information through the + * other pointers; this function does not allocate any memory; return 0 + * on end of file, 1 on success, and -1 on failure */ +static int +BBPreadBBPline(FILE *fp, unsigned bbpversion, int *lineno, BAT *bn, + int *hashash, + char *batname, char *filename, char **options) +{ + char buf[4096]; + uint64_t batid; + uint16_t status; + unsigned int properties; + int nread, n; + char *s; + uint64_t count, capacity = 0, base = 0; + + if (fgets(buf, sizeof(buf), fp) == NULL) { + if (ferror(fp)) { + TRC_CRITICAL(GDK, "error reading BBP.dir on line %d\n", *lineno); + return -1; + } + return 0; /* end of file */ + } + (*lineno)++; + if ((s = strchr(buf, '\r')) != NULL) { + /* convert \r\n into just \n */ + if (s[1] != '\n') { + TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", *lineno); + return -1; + } + *s++ = '\n'; + *s = 0; + } + + if (sscanf(buf, + "%" SCNu64 " %" SCNu16 " %128s %19s %u %" SCNu64 + " %" SCNu64 " %" SCNu64 + "%n", + &batid, &status, batname, filename, + &properties, + &count, &capacity, &base, + &nread) < 8) { + TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", *lineno); + return -1; + } + + if (batid >= N_BBPINIT * BBPINIT) { + TRC_CRITICAL(GDK, "bat ID (%" PRIu64 ") too large to accomodate (max %d), on line %d.", batid, N_BBPINIT * BBPINIT - 1, *lineno); + return -1; + } + + /* convert both / and \ path separators to our own DIR_SEP */ +#if DIR_SEP != '/' + s = filename; + while ((s = strchr(s, '/')) != NULL) + *s++ = DIR_SEP; +#endif +#if DIR_SEP != '\\' + s = filename; + while ((s = strchr(s, '\\')) != NULL) + *s++ = DIR_SEP; +#endif + + bn->batCacheid = (bat) batid; + BATinit_idents(bn); + bn->batTransient = false; + bn->batCopiedtodisk = true; + switch ((properties & 0x06) >> 1) { + case 0: + bn->batRestricted = BAT_WRITE; + break; + case 1: + bn->batRestricted = BAT_READ; + break; + case 2: + bn->batRestricted = BAT_APPEND; + break; + default: + TRC_CRITICAL(GDK, "incorrect batRestricted value"); + return -1; + } + bn->batCount = (BUN) count; + bn->batInserted = bn->batCount; + /* set capacity to at least count */ + bn->batCapacity = (BUN) count <= BATTINY ? BATTINY : (BUN) count; + + if (base > (uint64_t) GDK_oid_max) { + TRC_CRITICAL(GDK, "head seqbase out of range (ID = %" PRIu64 ", seq = %" PRIu64 ") on line %d.", batid, base, *lineno); + return -1; + } + bn->hseqbase = (oid) base; + n = heapinit(bn, buf + nread, + hashash, + bbpversion, filename, *lineno); + if (n < 0) { + return -1; + } + nread += n; + n = vheapinit(bn, buf + nread, *hashash, filename, *lineno); + if (n < 0) { + return -1; + } + nread += n; + + if (buf[nread] != '\n' && buf[nread] != ' ') { + TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", *lineno); + return -1; + } + *options = (buf[nread] == ' ') ? buf + nread + 1 : NULL; + return 1; +} + static gdk_return BBPreadEntries(FILE *fp, unsigned bbpversion, int lineno) { - bat bid = 0; - char buf[4096]; - /* read the BBP.dir and insert the BATs into the BBP */ - while (fgets(buf, sizeof(buf), fp) != NULL) { - BAT *bn; - uint64_t batid; - uint16_t status; + for (;;) { + BAT b; + Heap h; + Heap vh; + vh = h = (Heap) { + .free = 0, + }; + b = (BAT) { + .theap = &h, + .tvheap = &vh, + }; + char *options; char headname[129]; char filename[sizeof(BBP_physical(0))]; - unsigned int properties; - int nread, n; - char *s, *options = NULL; char logical[1024]; - uint64_t count, capacity, base = 0; int Thashash; - lineno++; - if ((s = strchr(buf, '\r')) != NULL) { - /* convert \r\n into just \n */ - if (s[1] != '\n') { - TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno); - return GDK_FAIL; - } - *s++ = '\n'; - *s = 0; - } - - if (sscanf(buf, - "%" SCNu64 " %" SCNu16 " %128s %19s %u %" SCNu64 - " %" SCNu64 " %" SCNu64 - "%n", - &batid, &status, headname, filename, - &properties, - &count, &capacity, &base, - &nread) < 8) { - TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno); - return GDK_FAIL; + switch (BBPreadBBPline(fp, bbpversion, &lineno, &b, + &Thashash, + headname, filename, &options)) { + case 0: + /* end of file */ + return GDK_SUCCEED; + case 1: + /* successfully read an entry */ + break; + default: + /* error */ + goto bailout; } - if (batid >= N_BBPINIT * BBPINIT) { - TRC_CRITICAL(GDK, "bat ID (%" PRIu64 ") too large to accomodate (max %d), on line %d.", batid, N_BBPINIT * BBPINIT - 1, lineno); - return GDK_FAIL; + if (b.batCacheid >= N_BBPINIT * BBPINIT) { + TRC_CRITICAL(GDK, "bat ID (%d) too large to accommodate (max %d), on line %d.", b.batCacheid, N_BBPINIT * BBPINIT - 1, lineno); + goto bailout; + } + + if (b.batCacheid >= (bat) ATOMIC_GET(&BBPsize)) { + if ((bat) ATOMIC_GET(&BBPsize) + 1 >= BBPlimit && + BBPextend(0, false, b.batCacheid + 1) != GDK_SUCCEED) + goto bailout; + ATOMIC_SET(&BBPsize, b.batCacheid + 1); } - - /* convert both / and \ path separators to our own DIR_SEP */ -#if DIR_SEP != '/' - s = filename; - while ((s = strchr(s, '/')) != NULL) - *s++ = DIR_SEP; -#endif -#if DIR_SEP != '\\' - s = filename; - while ((s = strchr(s, '\\')) != NULL) - *s++ = DIR_SEP; -#endif - - bid = (bat) batid; - if (batid >= (uint64_t) ATOMIC_GET(&BBPsize)) { - if ((bat) ATOMIC_GET(&BBPsize) + 1 >= BBPlimit && - BBPextend(0, false, bid + 1) != GDK_SUCCEED) - return GDK_FAIL; - ATOMIC_SET(&BBPsize, bid + 1); + if (BBP_desc(b.batCacheid) != NULL) { + TRC_CRITICAL(GDK, "duplicate entry in BBP.dir (ID = " + "%d) on line %d.", b.batCacheid, lineno); + goto bailout; } - if (BBP_desc(bid) != NULL) { - TRC_CRITICAL(GDK, "duplicate entry in BBP.dir (ID = " - "%" PRIu64 ") on line %d.", batid, lineno); - return GDK_FAIL; - } + + BAT *bn; + Heap *hn; if ((bn = GDKzalloc(sizeof(BAT))) == NULL || - (bn->theap = GDKzalloc(sizeof(Heap))) == NULL) { + (hn = GDKzalloc(sizeof(Heap))) == NULL) { GDKfree(bn); TRC_CRITICAL(GDK, "cannot allocate memory for BAT."); - return GDK_FAIL; + goto bailout; } - bn->batCacheid = bid; - if (BATroles(bn, NULL) != GDK_SUCCEED) { - GDKfree(bn->theap); + *bn = b; + *hn = h; + bn->theap = hn; + if (options && + (options = GDKstrdup(options)) == NULL) { + GDKfree(hn); GDKfree(bn); - TRC_CRITICAL(GDK, "BATroles failed."); - return GDK_FAIL; + PROPdestroy_nolock(&b); + TRC_CRITICAL(GDK, "GDKstrdup failed\n"); + goto bailout; } - bn->batTransient = false; - bn->batCopiedtodisk = true; - bn->batRestricted = (properties & 0x06) >> 1; - bn->batCount = (BUN) count; - bn->batInserted = bn->batCount; - /* set capacity to at least count */ - bn->batCapacity = (BUN) count <= BATTINY ? BATTINY : (BUN) count; + if (b.tvheap) { + Heap *vhn; + assert(b.tvheap == &vh); + if ((vhn = GDKmalloc(sizeof(Heap))) == NULL) { + GDKfree(hn); + GDKfree(bn); + GDKfree(options); + TRC_CRITICAL(GDK, "cannot allocate memory for BAT."); + goto bailout; + } + *vhn = vh; + bn->tvheap = vhn; + ATOMIC_INIT(&bn->tvheap->refs, 1); + } + char name[MT_NAME_LEN]; snprintf(name, sizeof(name), "heaplock%d", bn->batCacheid); /* fits */ MT_lock_init(&bn->theaplock, name); @@ -688,79 +791,46 @@ BBPreadEntries(FILE *fp, unsigned bbpver MT_rwlock_init(&bn->thashlock, name); ATOMIC_INIT(&bn->theap->refs, 1); - if (base > (uint64_t) GDK_oid_max) { - BATdestroy(bn); - TRC_CRITICAL(GDK, "head seqbase out of range (ID = %" PRIu64 ", seq = %" PRIu64 ") on line %d.", batid, base, lineno); - return GDK_FAIL; - } - bn->hseqbase = (oid) base; - n = heapinit(bn, buf + nread, &Thashash, bbpversion, bid, filename, lineno); - if (n < 0) { - BATdestroy(bn); - return GDK_FAIL; - } - nread += n; - n = vheapinit(bn, buf + nread, Thashash, bid, filename, lineno); - if (n < 0) { - BATdestroy(bn); - return GDK_FAIL; - } - nread += n; - - if (buf[nread] != '\n' && buf[nread] != ' ') { - BATdestroy(bn); - TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno); - return GDK_FAIL; - } - if (buf[nread] == ' ') - options = buf + nread + 1; - - if (snprintf(BBP_bak(bid), sizeof(BBP_bak(bid)), "tmp_%o", (unsigned) bid) >= (int) sizeof(BBP_bak(bid))) { + if (snprintf(BBP_bak(b.batCacheid), sizeof(BBP_bak(b.batCacheid)), "tmp_%o", (unsigned) b.batCacheid) >= (int) sizeof(BBP_bak(b.batCacheid))) { BATdestroy(bn); TRC_CRITICAL(GDK, "BBP logical filename directory is too large, on line %d\n", lineno); - return GDK_FAIL; + goto bailout; } + char *s; if ((s = strchr(headname, '~')) != NULL && s == headname) { - /* sizeof(logical) > sizeof(BBP_bak(bid)), so + /* sizeof(logical) > sizeof(BBP_bak(b.batCacheid)), so * this fits */ - strcpy(logical, BBP_bak(bid)); + strcpy(logical, BBP_bak(b.batCacheid)); } else { if (s) *s = 0; strcpy_len(logical, headname, sizeof(logical)); } - if (strcmp(logical, BBP_bak(bid)) == 0) { - BBP_logical(bid) = BBP_bak(bid); + if (strcmp(logical, BBP_bak(b.batCacheid)) == 0) { + BBP_logical(b.batCacheid) = BBP_bak(b.batCacheid); } else { - BBP_logical(bid) = GDKstrdup(logical); - if (BBP_logical(bid) == NULL) { + BBP_logical(b.batCacheid) = GDKstrdup(logical); + if (BBP_logical(b.batCacheid) == NULL) { BATdestroy(bn); TRC_CRITICAL(GDK, "GDKstrdup failed\n"); - return GDK_FAIL; + goto bailout; } } - /* tailname is ignored */ - strcpy_len(BBP_physical(bid), filename, sizeof(BBP_physical(bid))); + strcpy_len(BBP_physical(b.batCacheid), filename, sizeof(BBP_physical(b.batCacheid))); #ifdef __COVERITY__ /* help coverity */ - BBP_physical(bid)[sizeof(BBP_physical(bid)) - 1] = 0; + BBP_physical(b.batCacheid)[sizeof(BBP_physical(b.batCacheid)) - 1] = 0; #endif - BBP_options(bid) = NULL; - if (options) { - BBP_options(bid) = GDKstrdup(options); - if (BBP_options(bid) == NULL) { - BATdestroy(bn); - TRC_CRITICAL(GDK, "GDKstrdup failed\n"); - return GDK_FAIL; - } - } - BBP_refs(bid) = 0; - BBP_lrefs(bid) = 1; /* any BAT we encounter here is persistent, so has a logical reference */ - BBP_desc(bid) = bn; - BBP_pid(bid) = 0; - BBP_status_set(bid, BBPEXISTING); /* do we need other status bits? */ + BBP_options(b.batCacheid) = options; + BBP_refs(b.batCacheid) = 0; + BBP_lrefs(b.batCacheid) = 1; /* any BAT we encounter here is persistent, so has a logical reference */ + BBP_desc(b.batCacheid) = bn; + BBP_pid(b.batCacheid) = 0; + BBP_status_set(b.batCacheid, BBPEXISTING); /* do we need other status bits? */ } - return GDK_SUCCEED; + + bailout: + return GDK_FAIL; } /* check that the necessary files for all BATs exist and are large @@ -879,7 +949,7 @@ BBPcheckbats(unsigned bbpversion) #endif static unsigned -BBPheader(FILE *fp, int *lineno, bat *bbpsize) +BBPheader(FILE *fp, int *lineno, bat *bbpsize, lng *logno, lng *transid) { char buf[BUFSIZ]; int sz, ptrsize, oidsize, intsize; @@ -942,10 +1012,12 @@ BBPheader(FILE *fp, int *lineno, bat *bb TRC_CRITICAL(GDK, "short BBP"); return 0; } - if (sscanf(buf, "BBPinfo=" LLSCN " " LLSCN, &BBPlogno, &BBPtransid) != 2) { + if (sscanf(buf, "BBPinfo=" LLSCN " " LLSCN, logno, transid) != 2) { TRC_CRITICAL(GDK, "no info value found\n"); return 0; } + } else { + *logno = *transid = 0; } return bbpversion; } @@ -1300,11 +1372,16 @@ BBPinit(bool first) if (GDKinmemory(0)) { bbpversion = GDKLIBRARY; } else { - bbpversion = BBPheader(fp, &lineno, &bbpsize); + lng logno, transid; + bbpversion = BBPheader(fp, &lineno, &bbpsize, &logno, &transid); if (bbpversion == 0) { GDKdebug = dbg; return GDK_FAIL; } + assert(bbpversion > GDKLIBRARY_MINMAX_POS || logno == 0); + assert(bbpversion > GDKLIBRARY_MINMAX_POS || transid == 0); + ATOMIC_SET(&BBPlogno, logno); + ATOMIC_SET(&BBPtransid, transid); } /* allocate BBP records */ @@ -1534,22 +1611,6 @@ heap_entry(FILE *fp, BATiter *bi, BUN si free = 0; } - if ((GDKdebug & TAILCHKMASK) && free > 0) { - char *fname = GDKfilepath(0, BATDIR, BBP_physical(b->batCacheid), gettailnamebi(bi)); - if (fname != NULL) { - struct stat stb; - if (stat(fname, &stb) == -1) { - assert(0); - TRC_WARNING(GDK, "file %s not found (expected size %zu)\n", fname, free); - } else { - assert((size_t) stb.st_size >= free); - if ((size_t) stb.st_size < free) - TRC_WARNING(GDK, "file %s too small (expected %zu, actual %zu)\n", fname, free, (size_t) stb.st_size); - } - GDKfree(fname); - } - } - return fprintf(fp, " %s %d %d %d " BUNFMT " " BUNFMT " " BUNFMT " " BUNFMT " " OIDFMT " %zu %zu %d " OIDFMT " " OIDFMT, bi->type >= 0 ? BATatoms[bi->type].name : ATOMunknown_name(bi->type), @@ -1579,20 +1640,6 @@ vheap_entry(FILE *fp, BATiter *bi, BUN s (void) size; if (bi->vh == NULL) return 0; - if ((GDKdebug & TAILCHKMASK) && size > 0) { - char *fname = GDKfilepath(0, BATDIR, BBP_physical(bi->vh->parentid), "theap"); - if (fname != NULL) { - struct stat stb; - if (stat(fname, &stb) == -1) { - assert(0); - TRC_WARNING(GDK, "file %s not found (expected size %zu)\n", fname, bi->vhfree); - } else if ((size_t) stb.st_size < bi->vhfree) { - /* no assert since this can actually happen */ - TRC_WARNING(GDK, "file %s too small (expected %zu, actual %zu)\n", fname, bi->vhfree, (size_t) stb.st_size); - } - GDKfree(fname); - } - } return fprintf(fp, " %zu %zu %d", bi->vhfree, size == 0 ? 0 : bi->vh->size, 0); } @@ -1724,113 +1771,13 @@ BBPdir_first(bool subcommit, lng logno, static bat BBPdir_step(bat bid, BUN size, int n, char *buf, size_t bufsize, - FILE **obbpfp, FILE *nbbpf, bool subcommit, BATiter *bi, + FILE **obbpfp, FILE *nbbpf, BATiter *bi, oid minpos, oid maxpos) { if (n < -1) /* safety catch */ return n; while (n >= 0 && n < bid) { if (n > 0) { - if (GDKdebug & TAILCHKMASK) { - uint64_t batid, free, vfree; - char filename[sizeof(BBP_physical(0))]; - char type[33]; - uint16_t width; - char *fname; - struct stat stb; - switch (sscanf(buf, "%" SCNu64 " %*u %*s %19s %*u %*u %*u %*u %10s %" SCNu16 " %*u %*u %*u %*u %*u %*u %*u %" SCNu64 " %*u %*u %*u %*u %" SCNu64 " %*u %*u", - &batid, filename, type, &width, &free, &vfree)) { - case 5: - vfree = 0; - /* fall through */ - case 6: - assert(batid == (uint64_t) n); - if (free == 0) - break; - const char *tailname = "tail"; - if (strcmp(type, "str") == 0) { - switch (width) { - case 1: - tailname = "tail1"; - break; - case 2: - tailname = "tail2"; - break; -#if SIZEOF_VAR_T == 8 - case 4: - tailname = "tail4"; - break; -#endif - } - } - if (subcommit) { - char base[32]; - snprintf(base, sizeof(base), "%" PRIo64, batid); - fname = GDKfilepath(0, BAKDIR, base, tailname); - } else { - fname = GDKfilepath(0, BATDIR, filename, tailname); - } - if (fname == NULL) - break; - bool found = true; - if (stat(fname, &stb) == -1) { - if (subcommit) { - char *fname1 = GDKfilepath(0, BATDIR, filename, tailname); - if (fname1 == NULL) { - GDKfree(fname); - break; - } - if (stat(fname1, &stb) == -1) { - assert(0); - found = false; - GDKfree(fname1); - } else { - GDKfree(fname); - fname = fname1; - } - } else { - assert(0); - found = false; - } - } - if (!found) { - TRC_WARNING(GDK, "file %s not found (expected size %" PRIu64 ")\n", fname, free); - } else { - assert((uint64_t) stb.st_size >= free); - if ((uint64_t) stb.st_size < free) - TRC_WARNING(GDK, "file %s too small (expected %" PRIu64 ", actual %zu)\n", fname, free, (size_t) stb.st_size); - } - GDKfree(fname); - if (vfree == 0) - break; - if (subcommit) { - char base[32]; - snprintf(base, sizeof(base), "%" PRIo64, batid); - fname = GDKfilepath(0, BAKDIR, base, "theap"); - } else { - fname = GDKfilepath(0, BATDIR, filename, "theap"); - } - if (fname == NULL) - break; - if (stat(fname, &stb) == -1) { - if (subcommit) { - GDKfree(fname); - fname = GDKfilepath(0, BATDIR, filename, "theap"); - if (fname == NULL) - break; - if (stat(fname, &stb) == -1) - assert(0); - } else { - assert(0); - } - } - assert((uint64_t) stb.st_size >= vfree); - if ((uint64_t) stb.st_size < vfree) - TRC_WARNING(GDK, "file %s too small (expected %" PRIu64 ", actual %zu)\n", fname, vfree, (size_t) stb.st_size); - GDKfree(fname); - break; - } - } if (fputs(buf, nbbpf) == EOF) { GDKerror("Writing BBP.dir file failed.\n"); goto bailout; @@ -3553,6 +3500,122 @@ BBPbackup(BAT *b, bool subcommit) return GDK_FAIL; } +static inline void +BBPcheckHeap(bool subcommit, Heap *h) +{ + struct stat statb; + char *path; + + if (subcommit) { + char *s = strrchr(h->filename, DIR_SEP); + if (s) + s++; + else + s = h->filename; + path = GDKfilepath(0, BAKDIR, s, NULL); + if (path == NULL) + return; + if (MT_stat(path, &statb) < 0) { + GDKfree(path); + path = GDKfilepath(0, BATDIR, h->filename, NULL); + if (path == NULL) + return; + if (MT_stat(path, &statb) < 0) { + assert(0); + GDKsyserror("cannot stat file %s (expected size %zu)\n", + path, h->free); + GDKfree(path); + return; + } + } + } else { + path = GDKfilepath(0, BATDIR, h->filename, NULL); + if (path == NULL) + return; + if (MT_stat(path, &statb) < 0) { + assert(0); + GDKsyserror("cannot stat file %s (expected size %zu)\n", + path, h->free); + GDKfree(path); + return; + } + } + assert((statb.st_mode & S_IFMT) == S_IFREG); + assert((size_t) statb.st_size >= h->free); + if ((size_t) statb.st_size < h->free) { + GDKerror("file %s too small (expected %zu, actual %zu)\n", path, h->free, (size_t) statb.st_size); + GDKfree(path); + return; + } + GDKfree(path); +} + +static void +BBPcheckBBPdir(bool subcommit) +{ + FILE *fp; + int lineno = 0; + bat bbpsize = 0; + unsigned bbpversion; + lng logno, transid; + + fp = GDKfileopen(0, BATDIR, "BBP", "dir", "r"); + assert(fp != NULL); + if (fp == NULL) + return; + bbpversion = BBPheader(fp, &lineno, &bbpsize, &logno, &transid); + if (bbpversion == 0) { + fclose(fp); + return; /* error reading file */ + } + assert(bbpversion == GDKLIBRARY); + + for (;;) { + BAT b; + Heap h; + Heap vh; + vh = h = (Heap) { + .free = 0, + }; + b = (BAT) { + .theap = &h, + .tvheap = &vh, + }; + char *options; + char filename[sizeof(BBP_physical(0))]; + char batname[129]; + int hashash; + + switch (BBPreadBBPline(fp, bbpversion, &lineno, &b, + &hashash, + batname, filename, &options)) { + case 0: + /* end of file */ + fclose(fp); + return; + case 1: + /* successfully read an entry */ + break; + default: + /* error */ + fclose(fp); + return; + } + assert(hashash == 0); + assert(b.batCacheid < (bat) ATOMIC_GET(&BBPsize)); + assert(BBP_desc(b.batCacheid) != NULL); + assert(b.hseqbase <= GDK_oid_max); + if (b.ttype == TYPE_void) { + /* no files needed */ + continue; + } + if (b.theap->free > 0) + BBPcheckHeap(subcommit, b.theap); + if (b.tvheap != NULL && b.tvheap->free > 0) + BBPcheckHeap(subcommit, b.tvheap); + } +} + /* * @+ Atomic Write * The atomic BBPsync() function first safeguards the old images of @@ -3730,7 +3793,7 @@ BBPsync(int cnt, bat *restrict subcommit bi = bat_iterator(NULL); } if (ret == GDK_SUCCEED) { - n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf, nbbpf, subcommit != NULL, &bi, (BUN) minpos, (BUN) maxpos); + n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf, nbbpf, &bi, (BUN) minpos, (BUN) maxpos); } bat_iterator_end(&bi); if (n == -2) @@ -3757,6 +3820,9 @@ BBPsync(int cnt, bat *restrict subcommit * succeeded, so no changing of ret after this * call anymore */ + if ((GDKdebug & TAILCHKMASK) && !GDKinmemory(0)) + BBPcheckBBPdir(subcommit != NULL); + if (MT_rename(bakdir, deldir) < 0 && /* maybe there was an old deldir, so remove and try again */ (GDKremovedir(0, DELDIR) != GDK_SUCCEED ||
