changeset 86170:254dcf085ee5 Jul2021

Refactor BBP.dir reading code. Use a function to read a single line. This function is now also used for the TAILCHKMASK check just before the actual commit. Manual backport of changeset 11d1a723b150.
author Sjoerd Mullender <sjoerd@acm.org>
date Wed, 27 Jul 2022 16:43:31 +0200
parents 748a688a1700
children 73247d49ccd2
files gdk/gdk_bbp.c
diffstat 1 files changed, 339 insertions(+), 273 deletions(-) [+]
line wrap: on
line diff
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -428,7 +428,7 @@ static gdk_return BBPrecover_subdir(void
 static bool BBPdiskscan(const char *, size_t);
 
 static int
-heapinit(BAT *b, const char *buf, int *hashash, unsigned bbpversion, bat bid, const char *filename, int lineno)
+heapinit(BAT *b, const char *buf, int *hashash, unsigned bbpversion, const char *filename, int lineno)
 {
 	int t;
 	char type[33];
@@ -488,7 +488,7 @@ heapinit(BAT *b, const char *buf, int *h
 			return -1;
 		}
 	} else if (var != (t == TYPE_void || BATatoms[t].atomPut != NULL)) {
-		TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tvarsized mismatch for BAT %d on line %d\n", (int) bid, lineno);
+		TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tvarsized mismatch for BAT %d on line %d\n", (int) b->batCacheid, lineno);
 		return -1;
 	} else if (var && t != 0 ?
 		   ATOMsize(t) < width ||
@@ -498,7 +498,7 @@ heapinit(BAT *b, const char *buf, int *h
 #endif
 			   ) :
 		   ATOMsize(t) != width) {
-		TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tsize mismatch for BAT %d on line %d\n", (int) bid, lineno);
+		TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tsize mismatch for BAT %d on line %d\n", (int) b->batCacheid, lineno);
 		return -1;
 	}
 	b->ttype = t;
@@ -541,7 +541,7 @@ heapinit(BAT *b, const char *buf, int *h
 }
 
 static int
-vheapinit(BAT *b, const char *buf, int hashash, bat bid, const char *filename, int lineno)
+vheapinit(BAT *b, const char *buf, int hashash, const char *filename, int lineno)
 {
 	int n = 0;
 	uint64_t free, size;
@@ -555,11 +555,6 @@ vheapinit(BAT *b, const char *buf, int h
 			TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno);
 			return -1;
 		}
-		b->tvheap = GDKmalloc(sizeof(Heap));
-		if (b->tvheap == NULL) {
-			TRC_CRITICAL(GDK, "cannot allocate memory for heap.");
-			return -1;
-		}
 		if (b->ttype >= 0 &&
 		    ATOMstorage(b->ttype) == TYPE_str &&
 		    free < GDK_STRHASHTABLE * sizeof(stridx_t) + BATTINY * GDK_VARALIGN)
@@ -577,108 +572,216 @@ vheapinit(BAT *b, const char *buf, int h
 			.cleanhash = true,
 			.newstorage = STORE_INVALID,
 			.dirty = false,
-			.parentid = bid,
+			.parentid = b->batCacheid,
 			.farmid = BBPselectfarm(PERSISTENT, b->ttype, varheap),
 		};
 		strconcat_len(b->tvheap->filename, sizeof(b->tvheap->filename),
 			      filename, ".theap", NULL);
-		ATOMIC_INIT(&b->tvheap->refs, 1);
+	} else {
+		b->tvheap = NULL;
 	}
 	return n;
 }
 
+/* read a single line from the BBP.dir file (file pointer fp) and fill
+ * in the structure pointed to by bn and extra information through the
+ * other pointers; this function does not allocate any memory; return 0
+ * on end of file, 1 on success, and -1 on failure */
+static int
+BBPreadBBPline(FILE *fp, unsigned bbpversion, int *lineno, BAT *bn,
+	       int *hashash,
+	       char *batname, char *filename, char **options)
+{
+	char buf[4096];
+	uint64_t batid;
+	uint16_t status;
+	unsigned int properties;
+	int nread, n;
+	char *s;
+	uint64_t count, capacity = 0, base = 0;
+
+	if (fgets(buf, sizeof(buf), fp) == NULL) {
+		if (ferror(fp)) {
+			TRC_CRITICAL(GDK, "error reading BBP.dir on line %d\n", *lineno);
+			return -1;
+		}
+		return 0;	/* end of file */
+	}
+	(*lineno)++;
+	if ((s = strchr(buf, '\r')) != NULL) {
+		/* convert \r\n into just \n */
+		if (s[1] != '\n') {
+			TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", *lineno);
+			return -1;
+		}
+		*s++ = '\n';
+		*s = 0;
+	}
+
+	if (sscanf(buf,
+		   "%" SCNu64 " %" SCNu16 " %128s %19s %u %" SCNu64
+		   " %" SCNu64 " %" SCNu64
+		   "%n",
+		   &batid, &status, batname, filename,
+		   &properties,
+		   &count, &capacity, &base,
+		   &nread) < 8) {
+		TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", *lineno);
+		return -1;
+	}
+
+	if (batid >= N_BBPINIT * BBPINIT) {
+		TRC_CRITICAL(GDK, "bat ID (%" PRIu64 ") too large to accomodate (max %d), on line %d.", batid, N_BBPINIT * BBPINIT - 1, *lineno);
+		return -1;
+	}
+
+	/* convert both / and \ path separators to our own DIR_SEP */
+#if DIR_SEP != '/'
+	s = filename;
+	while ((s = strchr(s, '/')) != NULL)
+		*s++ = DIR_SEP;
+#endif
+#if DIR_SEP != '\\'
+	s = filename;
+	while ((s = strchr(s, '\\')) != NULL)
+		*s++ = DIR_SEP;
+#endif
+
+	bn->batCacheid = (bat) batid;
+	BATinit_idents(bn);
+	bn->batTransient = false;
+	bn->batCopiedtodisk = true;
+	switch ((properties & 0x06) >> 1) {
+	case 0:
+		bn->batRestricted = BAT_WRITE;
+		break;
+	case 1:
+		bn->batRestricted = BAT_READ;
+		break;
+	case 2:
+		bn->batRestricted = BAT_APPEND;
+		break;
+	default:
+		TRC_CRITICAL(GDK, "incorrect batRestricted value");
+		return -1;
+	}
+	bn->batCount = (BUN) count;
+	bn->batInserted = bn->batCount;
+	/* set capacity to at least count */
+	bn->batCapacity = (BUN) count <= BATTINY ? BATTINY : (BUN) count;
+
+	if (base > (uint64_t) GDK_oid_max) {
+		TRC_CRITICAL(GDK, "head seqbase out of range (ID = %" PRIu64 ", seq = %" PRIu64 ") on line %d.", batid, base, *lineno);
+		return -1;
+	}
+	bn->hseqbase = (oid) base;
+	n = heapinit(bn, buf + nread,
+		     hashash,
+		     bbpversion, filename, *lineno);
+	if (n < 0) {
+		return -1;
+	}
+	nread += n;
+	n = vheapinit(bn, buf + nread, *hashash, filename, *lineno);
+	if (n < 0) {
+		return -1;
+	}
+	nread += n;
+
+	if (buf[nread] != '\n' && buf[nread] != ' ') {
+		TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", *lineno);
+		return -1;
+	}
+	*options = (buf[nread] == ' ') ? buf + nread + 1 : NULL;
+	return 1;
+}
+
 static gdk_return
 BBPreadEntries(FILE *fp, unsigned bbpversion, int lineno)
 {
-	bat bid = 0;
-	char buf[4096];
-
 	/* read the BBP.dir and insert the BATs into the BBP */
-	while (fgets(buf, sizeof(buf), fp) != NULL) {
-		BAT *bn;
-		uint64_t batid;
-		uint16_t status;
+	for (;;) {
+		BAT b;
+		Heap h;
+		Heap vh;
+		vh = h = (Heap) {
+			.free = 0,
+		};
+		b = (BAT) {
+			.theap = &h,
+			.tvheap = &vh,
+		};
+		char *options;
 		char headname[129];
 		char filename[sizeof(BBP_physical(0))];
-		unsigned int properties;
-		int nread, n;
-		char *s, *options = NULL;
 		char logical[1024];
-		uint64_t count, capacity, base = 0;
 		int Thashash;
 
-		lineno++;
-		if ((s = strchr(buf, '\r')) != NULL) {
-			/* convert \r\n into just \n */
-			if (s[1] != '\n') {
-				TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno);
-				return GDK_FAIL;
-			}
-			*s++ = '\n';
-			*s = 0;
-		}
-
-		if (sscanf(buf,
-			   "%" SCNu64 " %" SCNu16 " %128s %19s %u %" SCNu64
-			   " %" SCNu64 " %" SCNu64
-			   "%n",
-			   &batid, &status, headname, filename,
-			   &properties,
-			   &count, &capacity, &base,
-			   &nread) < 8) {
-			TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno);
-			return GDK_FAIL;
+		switch (BBPreadBBPline(fp, bbpversion, &lineno, &b,
+				       &Thashash,
+				       headname, filename, &options)) {
+		case 0:
+			/* end of file */
+			return GDK_SUCCEED;
+		case 1:
+			/* successfully read an entry */
+			break;
+		default:
+			/* error */
+			goto bailout;
 		}
 
-		if (batid >= N_BBPINIT * BBPINIT) {
-			TRC_CRITICAL(GDK, "bat ID (%" PRIu64 ") too large to accomodate (max %d), on line %d.", batid, N_BBPINIT * BBPINIT - 1, lineno);
-			return GDK_FAIL;
+		if (b.batCacheid >= N_BBPINIT * BBPINIT) {
+			TRC_CRITICAL(GDK, "bat ID (%d) too large to accommodate (max %d), on line %d.", b.batCacheid, N_BBPINIT * BBPINIT - 1, lineno);
+			goto bailout;
+		}
+
+		if (b.batCacheid >= (bat) ATOMIC_GET(&BBPsize)) {
+			if ((bat) ATOMIC_GET(&BBPsize) + 1 >= BBPlimit &&
+			    BBPextend(0, false, b.batCacheid + 1) != GDK_SUCCEED)
+				goto bailout;
+			ATOMIC_SET(&BBPsize, b.batCacheid + 1);
 		}
-
-		/* convert both / and \ path separators to our own DIR_SEP */
-#if DIR_SEP != '/'
-		s = filename;
-		while ((s = strchr(s, '/')) != NULL)
-			*s++ = DIR_SEP;
-#endif
-#if DIR_SEP != '\\'
-		s = filename;
-		while ((s = strchr(s, '\\')) != NULL)
-			*s++ = DIR_SEP;
-#endif
-
-		bid = (bat) batid;
-		if (batid >= (uint64_t) ATOMIC_GET(&BBPsize)) {
-			if ((bat) ATOMIC_GET(&BBPsize) + 1 >= BBPlimit &&
-			    BBPextend(0, false, bid + 1) != GDK_SUCCEED)
-				return GDK_FAIL;
-			ATOMIC_SET(&BBPsize, bid + 1);
+		if (BBP_desc(b.batCacheid) != NULL) {
+			TRC_CRITICAL(GDK, "duplicate entry in BBP.dir (ID = "
+				     "%d) on line %d.", b.batCacheid, lineno);
+			goto bailout;
 		}
-		if (BBP_desc(bid) != NULL) {
-			TRC_CRITICAL(GDK, "duplicate entry in BBP.dir (ID = "
-				     "%" PRIu64 ") on line %d.", batid, lineno);
-			return GDK_FAIL;
-		}
+
+		BAT *bn;
+		Heap *hn;
 		if ((bn = GDKzalloc(sizeof(BAT))) == NULL ||
-		    (bn->theap = GDKzalloc(sizeof(Heap))) == NULL) {
+		    (hn = GDKzalloc(sizeof(Heap))) == NULL) {
 			GDKfree(bn);
 			TRC_CRITICAL(GDK, "cannot allocate memory for BAT.");
-			return GDK_FAIL;
+			goto bailout;
 		}
-		bn->batCacheid = bid;
-		if (BATroles(bn, NULL) != GDK_SUCCEED) {
-			GDKfree(bn->theap);
+		*bn = b;
+		*hn = h;
+		bn->theap = hn;
+		if (options &&
+		    (options = GDKstrdup(options)) == NULL) {
+			GDKfree(hn);
 			GDKfree(bn);
-			TRC_CRITICAL(GDK, "BATroles failed.");
-			return GDK_FAIL;
+			PROPdestroy_nolock(&b);
+			TRC_CRITICAL(GDK, "GDKstrdup failed\n");
+			goto bailout;
 		}
-		bn->batTransient = false;
-		bn->batCopiedtodisk = true;
-		bn->batRestricted = (properties & 0x06) >> 1;
-		bn->batCount = (BUN) count;
-		bn->batInserted = bn->batCount;
-		/* set capacity to at least count */
-		bn->batCapacity = (BUN) count <= BATTINY ? BATTINY : (BUN) count;
+		if (b.tvheap) {
+			Heap *vhn;
+			assert(b.tvheap == &vh);
+			if ((vhn = GDKmalloc(sizeof(Heap))) == NULL) {
+				GDKfree(hn);
+				GDKfree(bn);
+				GDKfree(options);
+				TRC_CRITICAL(GDK, "cannot allocate memory for BAT.");
+				goto bailout;
+			}
+			*vhn = vh;
+			bn->tvheap = vhn;
+			ATOMIC_INIT(&bn->tvheap->refs, 1);
+		}
+
 		char name[MT_NAME_LEN];
 		snprintf(name, sizeof(name), "heaplock%d", bn->batCacheid); /* fits */
 		MT_lock_init(&bn->theaplock, name);
@@ -688,79 +791,46 @@ BBPreadEntries(FILE *fp, unsigned bbpver
 		MT_rwlock_init(&bn->thashlock, name);
 		ATOMIC_INIT(&bn->theap->refs, 1);
 
-		if (base > (uint64_t) GDK_oid_max) {
-			BATdestroy(bn);
-			TRC_CRITICAL(GDK, "head seqbase out of range (ID = %" PRIu64 ", seq = %" PRIu64 ") on line %d.", batid, base, lineno);
-			return GDK_FAIL;
-		}
-		bn->hseqbase = (oid) base;
-		n = heapinit(bn, buf + nread, &Thashash, bbpversion, bid, filename, lineno);
-		if (n < 0) {
-			BATdestroy(bn);
-			return GDK_FAIL;
-		}
-		nread += n;
-		n = vheapinit(bn, buf + nread, Thashash, bid, filename, lineno);
-		if (n < 0) {
-			BATdestroy(bn);
-			return GDK_FAIL;
-		}
-		nread += n;
-
-		if (buf[nread] != '\n' && buf[nread] != ' ') {
-			BATdestroy(bn);
-			TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno);
-			return GDK_FAIL;
-		}
-		if (buf[nread] == ' ')
-			options = buf + nread + 1;
-
-		if (snprintf(BBP_bak(bid), sizeof(BBP_bak(bid)), "tmp_%o", (unsigned) bid) >= (int) sizeof(BBP_bak(bid))) {
+		if (snprintf(BBP_bak(b.batCacheid), sizeof(BBP_bak(b.batCacheid)), "tmp_%o", (unsigned) b.batCacheid) >= (int) sizeof(BBP_bak(b.batCacheid))) {
 			BATdestroy(bn);
 			TRC_CRITICAL(GDK, "BBP logical filename directory is too large, on line %d\n", lineno);
-			return GDK_FAIL;
+			goto bailout;
 		}
+		char *s;
 		if ((s = strchr(headname, '~')) != NULL && s == headname) {
-			/* sizeof(logical) > sizeof(BBP_bak(bid)), so
+			/* sizeof(logical) > sizeof(BBP_bak(b.batCacheid)), so
 			 * this fits */
-			strcpy(logical, BBP_bak(bid));
+			strcpy(logical, BBP_bak(b.batCacheid));
 		} else {
 			if (s)
 				*s = 0;
 			strcpy_len(logical, headname, sizeof(logical));
 		}
-		if (strcmp(logical, BBP_bak(bid)) == 0) {
-			BBP_logical(bid) = BBP_bak(bid);
+		if (strcmp(logical, BBP_bak(b.batCacheid)) == 0) {
+			BBP_logical(b.batCacheid) = BBP_bak(b.batCacheid);
 		} else {
-			BBP_logical(bid) = GDKstrdup(logical);
-			if (BBP_logical(bid) == NULL) {
+			BBP_logical(b.batCacheid) = GDKstrdup(logical);
+			if (BBP_logical(b.batCacheid) == NULL) {
 				BATdestroy(bn);
 				TRC_CRITICAL(GDK, "GDKstrdup failed\n");
-				return GDK_FAIL;
+				goto bailout;
 			}
 		}
-		/* tailname is ignored */
-		strcpy_len(BBP_physical(bid), filename, sizeof(BBP_physical(bid)));
+		strcpy_len(BBP_physical(b.batCacheid), filename, sizeof(BBP_physical(b.batCacheid)));
 #ifdef __COVERITY__
 		/* help coverity */
-		BBP_physical(bid)[sizeof(BBP_physical(bid)) - 1] = 0;
+		BBP_physical(b.batCacheid)[sizeof(BBP_physical(b.batCacheid)) - 1] = 0;
 #endif
-		BBP_options(bid) = NULL;
-		if (options) {
-			BBP_options(bid) = GDKstrdup(options);
-			if (BBP_options(bid) == NULL) {
-				BATdestroy(bn);
-				TRC_CRITICAL(GDK, "GDKstrdup failed\n");
-				return GDK_FAIL;
-			}
-		}
-		BBP_refs(bid) = 0;
-		BBP_lrefs(bid) = 1;	/* any BAT we encounter here is persistent, so has a logical reference */
-		BBP_desc(bid) = bn;
-		BBP_pid(bid) = 0;
-		BBP_status_set(bid, BBPEXISTING);	/* do we need other status bits? */
+		BBP_options(b.batCacheid) = options;
+		BBP_refs(b.batCacheid) = 0;
+		BBP_lrefs(b.batCacheid) = 1;	/* any BAT we encounter here is persistent, so has a logical reference */
+		BBP_desc(b.batCacheid) = bn;
+		BBP_pid(b.batCacheid) = 0;
+		BBP_status_set(b.batCacheid, BBPEXISTING);	/* do we need other status bits? */
 	}
-	return GDK_SUCCEED;
+
+  bailout:
+	return GDK_FAIL;
 }
 
 /* check that the necessary files for all BATs exist and are large
@@ -879,7 +949,7 @@ BBPcheckbats(unsigned bbpversion)
 #endif
 
 static unsigned
-BBPheader(FILE *fp, int *lineno, bat *bbpsize)
+BBPheader(FILE *fp, int *lineno, bat *bbpsize, lng *logno, lng *transid)
 {
 	char buf[BUFSIZ];
 	int sz, ptrsize, oidsize, intsize;
@@ -942,10 +1012,12 @@ BBPheader(FILE *fp, int *lineno, bat *bb
 			TRC_CRITICAL(GDK, "short BBP");
 			return 0;
 		}
-		if (sscanf(buf, "BBPinfo=" LLSCN " " LLSCN, &BBPlogno, &BBPtransid) != 2) {
+		if (sscanf(buf, "BBPinfo=" LLSCN " " LLSCN, logno, transid) != 2) {
 			TRC_CRITICAL(GDK, "no info value found\n");
 			return 0;
 		}
+	} else {
+		*logno = *transid = 0;
 	}
 	return bbpversion;
 }
@@ -1300,11 +1372,16 @@ BBPinit(bool first)
 	if (GDKinmemory(0)) {
 		bbpversion = GDKLIBRARY;
 	} else {
-		bbpversion = BBPheader(fp, &lineno, &bbpsize);
+		lng logno, transid;
+		bbpversion = BBPheader(fp, &lineno, &bbpsize, &logno, &transid);
 		if (bbpversion == 0) {
 			GDKdebug = dbg;
 			return GDK_FAIL;
 		}
+		assert(bbpversion > GDKLIBRARY_MINMAX_POS || logno == 0);
+		assert(bbpversion > GDKLIBRARY_MINMAX_POS || transid == 0);
+		ATOMIC_SET(&BBPlogno, logno);
+		ATOMIC_SET(&BBPtransid, transid);
 	}
 
 	/* allocate BBP records */
@@ -1534,22 +1611,6 @@ heap_entry(FILE *fp, BATiter *bi, BUN si
 			free = 0;
 	}
 
-	if ((GDKdebug & TAILCHKMASK) && free > 0) {
-		char *fname = GDKfilepath(0, BATDIR, BBP_physical(b->batCacheid), gettailnamebi(bi));
-		if (fname != NULL) {
-			struct stat stb;
-			if (stat(fname, &stb) == -1) {
-				assert(0);
-				TRC_WARNING(GDK, "file %s not found (expected size %zu)\n", fname, free);
-			} else {
-				assert((size_t) stb.st_size >= free);
-				if ((size_t) stb.st_size < free)
-					TRC_WARNING(GDK, "file %s too small (expected %zu, actual %zu)\n", fname, free, (size_t) stb.st_size);
-			}
-			GDKfree(fname);
-		}
-	}
-
 	return fprintf(fp, " %s %d %d %d " BUNFMT " " BUNFMT " " BUNFMT " "
 		       BUNFMT " " OIDFMT " %zu %zu %d " OIDFMT " " OIDFMT,
 		       bi->type >= 0 ? BATatoms[bi->type].name : ATOMunknown_name(bi->type),
@@ -1579,20 +1640,6 @@ vheap_entry(FILE *fp, BATiter *bi, BUN s
 	(void) size;
 	if (bi->vh == NULL)
 		return 0;
-	if ((GDKdebug & TAILCHKMASK) && size > 0) {
-		char *fname = GDKfilepath(0, BATDIR, BBP_physical(bi->vh->parentid), "theap");
-		if (fname != NULL) {
-			struct stat stb;
-			if (stat(fname, &stb) == -1) {
-				assert(0);
-				TRC_WARNING(GDK, "file %s not found (expected size %zu)\n", fname, bi->vhfree);
-			} else if ((size_t) stb.st_size < bi->vhfree) {
-				/* no assert since this can actually happen */
-				TRC_WARNING(GDK, "file %s too small (expected %zu, actual %zu)\n", fname, bi->vhfree, (size_t) stb.st_size);
-			}
-			GDKfree(fname);
-		}
-	}
 	return fprintf(fp, " %zu %zu %d", bi->vhfree, size == 0 ? 0 : bi->vh->size, 0);
 }
 
@@ -1724,113 +1771,13 @@ BBPdir_first(bool subcommit, lng logno, 
 
 static bat
 BBPdir_step(bat bid, BUN size, int n, char *buf, size_t bufsize,
-	    FILE **obbpfp, FILE *nbbpf, bool subcommit, BATiter *bi,
+	    FILE **obbpfp, FILE *nbbpf, BATiter *bi,
 	    oid minpos, oid maxpos)
 {
 	if (n < -1)		/* safety catch */
 		return n;
 	while (n >= 0 && n < bid) {
 		if (n > 0) {
-			if (GDKdebug & TAILCHKMASK) {
-				uint64_t batid, free, vfree;
-				char filename[sizeof(BBP_physical(0))];
-				char type[33];
-				uint16_t width;
-				char *fname;
-				struct stat stb;
-				switch (sscanf(buf, "%" SCNu64 " %*u %*s %19s %*u %*u %*u %*u %10s %" SCNu16 " %*u %*u %*u %*u %*u %*u %*u %" SCNu64 " %*u %*u %*u %*u %" SCNu64 " %*u %*u",
-					       &batid, filename, type, &width, &free, &vfree)) {
-				case 5:
-					vfree = 0;
-					/* fall through */
-				case 6:
-					assert(batid == (uint64_t) n);
-					if (free == 0)
-						break;
-					const char *tailname = "tail";
-					if (strcmp(type, "str") == 0) {
-						switch (width) {
-						case 1:
-							tailname = "tail1";
-							break;
-						case 2:
-							tailname = "tail2";
-							break;
-#if SIZEOF_VAR_T == 8
-						case 4:
-							tailname = "tail4";
-							break;
-#endif
-						}
-					}
-					if (subcommit) {
-						char base[32];
-						snprintf(base, sizeof(base), "%" PRIo64, batid);
-						fname = GDKfilepath(0, BAKDIR, base, tailname);
-					} else {
-						fname = GDKfilepath(0, BATDIR, filename, tailname);
-					}
-					if (fname == NULL)
-						break;
-					bool found = true;
-					if (stat(fname, &stb) == -1) {
-						if (subcommit) {
-							char *fname1 = GDKfilepath(0, BATDIR, filename, tailname);
-							if (fname1 == NULL) {
-								GDKfree(fname);
-								break;
-							}
-							if (stat(fname1, &stb) == -1) {
-								assert(0);
-								found = false;
-								GDKfree(fname1);
-							} else {
-								GDKfree(fname);
-								fname = fname1;
-							}
-						} else {
-							assert(0);
-							found = false;
-						}
-					}
-					if (!found) {
-						TRC_WARNING(GDK, "file %s not found (expected size %" PRIu64 ")\n", fname, free);
-					} else {
-						assert((uint64_t) stb.st_size >= free);
-						if ((uint64_t) stb.st_size < free)
-							TRC_WARNING(GDK, "file %s too small (expected %" PRIu64 ", actual %zu)\n", fname, free, (size_t) stb.st_size);
-					}
-					GDKfree(fname);
-					if (vfree == 0)
-						break;
-					if (subcommit) {
-						char base[32];
-						snprintf(base, sizeof(base), "%" PRIo64, batid);
-						fname = GDKfilepath(0, BAKDIR, base, "theap");
-					} else {
-						fname = GDKfilepath(0, BATDIR, filename, "theap");
-					}
-					if (fname == NULL)
-						break;
-					if (stat(fname, &stb) == -1) {
-						if (subcommit) {
-							GDKfree(fname);
-							fname = GDKfilepath(0, BATDIR, filename, "theap");
-							if (fname == NULL)
-								break;
-							if (stat(fname, &stb) == -1)
-								assert(0);
-						} else {
-							assert(0);
-						}
-					}
-					assert((uint64_t) stb.st_size >= vfree);
-					if ((uint64_t) stb.st_size < vfree)
-						TRC_WARNING(GDK, "file %s too small (expected %" PRIu64 ", actual %zu)\n", fname, vfree, (size_t) stb.st_size);
-					GDKfree(fname);
-					break;
-				}
-			}
 			if (fputs(buf, nbbpf) == EOF) {
 				GDKerror("Writing BBP.dir file failed.\n");
 				goto bailout;
@@ -3553,6 +3500,122 @@ BBPbackup(BAT *b, bool subcommit)
 	return GDK_FAIL;
 }
 
+static inline void
+BBPcheckHeap(bool subcommit, Heap *h)
+{
+	struct stat statb;
+	char *path;
+
+	if (subcommit) {
+		char *s = strrchr(h->filename, DIR_SEP);
+		if (s)
+			s++;
+		else
+			s = h->filename;
+		path = GDKfilepath(0, BAKDIR, s, NULL);
+		if (path == NULL)
+			return;
+		if (MT_stat(path, &statb) < 0) {
+			GDKfree(path);
+			path = GDKfilepath(0, BATDIR, h->filename, NULL);
+			if (path == NULL)
+				return;
+			if (MT_stat(path, &statb) < 0) {
+				assert(0);
+				GDKsyserror("cannot stat file %s (expected size %zu)\n",
+					    path, h->free);
+				GDKfree(path);
+				return;
+			}
+		}
+	} else {
+		path = GDKfilepath(0, BATDIR, h->filename, NULL);
+		if (path == NULL)
+			return;
+		if (MT_stat(path, &statb) < 0) {
+			assert(0);
+			GDKsyserror("cannot stat file %s (expected size %zu)\n",
+				    path, h->free);
+			GDKfree(path);
+			return;
+		}
+	}
+	assert((statb.st_mode & S_IFMT) == S_IFREG);
+	assert((size_t) statb.st_size >= h->free);
+	if ((size_t) statb.st_size < h->free) {
+		GDKerror("file %s too small (expected %zu, actual %zu)\n", path, h->free, (size_t) statb.st_size);
+		GDKfree(path);
+		return;
+	}
+	GDKfree(path);
+}
+
+static void
+BBPcheckBBPdir(bool subcommit)
+{
+	FILE *fp;
+	int lineno = 0;
+	bat bbpsize = 0;
+	unsigned bbpversion;
+	lng logno, transid;
+
+	fp = GDKfileopen(0, BATDIR, "BBP", "dir", "r");
+	assert(fp != NULL);
+	if (fp == NULL)
+		return;
+	bbpversion = BBPheader(fp, &lineno, &bbpsize, &logno, &transid);
+	if (bbpversion == 0) {
+		fclose(fp);
+		return;		/* error reading file */
+	}
+	assert(bbpversion == GDKLIBRARY);
+
+	for (;;) {
+		BAT b;
+		Heap h;
+		Heap vh;
+		vh = h = (Heap) {
+			.free = 0,
+		};
+		b = (BAT) {
+			.theap = &h,
+			.tvheap = &vh,
+		};
+		char *options;
+		char filename[sizeof(BBP_physical(0))];
+		char batname[129];
+		int hashash;
+
+		switch (BBPreadBBPline(fp, bbpversion, &lineno, &b,
+				       &hashash,
+				       batname, filename, &options)) {
+		case 0:
+			/* end of file */
+			fclose(fp);
+			return;
+		case 1:
+			/* successfully read an entry */
+			break;
+		default:
+			/* error */
+			fclose(fp);
+			return;
+		}
+		assert(hashash == 0);
+		assert(b.batCacheid < (bat) ATOMIC_GET(&BBPsize));
+		assert(BBP_desc(b.batCacheid) != NULL);
+		assert(b.hseqbase <= GDK_oid_max);
+		if (b.ttype == TYPE_void) {
+			/* no files needed */
+			continue;
+		}
+		if (b.theap->free > 0)
+			BBPcheckHeap(subcommit, b.theap);
+		if (b.tvheap != NULL && b.tvheap->free > 0)
+			BBPcheckHeap(subcommit, b.tvheap);
+	}
+}
+
 /*
  * @+ Atomic Write
  * The atomic BBPsync() function first safeguards the old images of
@@ -3730,7 +3793,7 @@ BBPsync(int cnt, bat *restrict subcommit
 				bi = bat_iterator(NULL);
 			}
 			if (ret == GDK_SUCCEED) {
-				n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf, nbbpf, subcommit != NULL, &bi, (BUN) minpos, (BUN) maxpos);
+				n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf, nbbpf, &bi, (BUN) minpos, (BUN) maxpos);
 			}
 			bat_iterator_end(&bi);
 			if (n == -2)
@@ -3757,6 +3820,9 @@ BBPsync(int cnt, bat *restrict subcommit
 		 * succeeded, so no changing of ret after this
 		 * call anymore */
 
+		if ((GDKdebug & TAILCHKMASK) && !GDKinmemory(0))
+			BBPcheckBBPdir(subcommit != NULL);
+
 		if (MT_rename(bakdir, deldir) < 0 &&
 		    /* maybe there was an old deldir, so remove and try again */
 		    (GDKremovedir(0, DELDIR) != GDK_SUCCEED ||