changeset 86182:dea28f85232a Jan2022

Merge with Jul2021 branch.
author Sjoerd Mullender <sjoerd@acm.org>
date Thu, 28 Jul 2022 10:44:33 +0200
parents aa86c05de2ff (current diff) ede5229fbf99 (diff)
children daeb22fc015d
files gdk/gdk_bat.c gdk/gdk_bbp.c gdk/gdk_calc.c gdk/gdk_delta.c gdk/gdk_hash.c gdk/gdk_heap.c gdk/gdk_imprints.c gdk/gdk_orderidx.c gdk/gdk_project.c monetdb5/modules/kernel/batstr.c monetdb5/modules/mal/tablet.c
diffstat 11 files changed, 372 insertions(+), 289 deletions(-) [+]
line wrap: on
line diff
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -899,8 +899,7 @@ COLcopy(BAT *b, int tt, bool writable, r
 			/* convert number of bits to number of bytes,
 			 * and round the latter up to a multiple of
 			 * 4 (copy in units of 4 bytes) */
-			bn->theap->free = (bi.count + 7) / 8;
-			bn->theap->free = (bn->theap->free + 3) & ~(size_t)3;
+			bn->theap->free = ((bi.count + 31) / 32) * 4;
 			bn->theap->dirty |= bi.count > 0;
 			memcpy(Tloc(bn, 0), bi.base, bn->theap->free);
 		} else {
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -433,7 +433,7 @@ heapinit(BAT *b, const char *buf,
 #ifdef GDKLIBRARY_HASHASH
 	 int *hashash,
 #endif
-	 unsigned bbpversion, bat bid, const char *filename, int lineno)
+	 unsigned bbpversion, const char *filename, int lineno)
 {
 	int t;
 	char type[33];
@@ -495,7 +495,7 @@ heapinit(BAT *b, const char *buf,
 			return -1;
 		}
 	} else if (var != (t == TYPE_void || BATatoms[t].atomPut != NULL)) {
-		TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tvarsized mismatch for BAT %d on line %d\n", (int) bid, lineno);
+		TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tvarsized mismatch for BAT %d on line %d\n", (int) b->batCacheid, lineno);
 		return -1;
 	} else if (var && t != 0 ?
 		   ATOMsize(t) < width ||
@@ -505,7 +505,7 @@ heapinit(BAT *b, const char *buf,
 #endif
 			   ) :
 		   ATOMsize(t) != width) {
-		TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tsize mismatch for BAT %d on line %d\n", (int) bid, lineno);
+		TRC_CRITICAL(GDK, "inconsistent entry in BBP.dir: tsize mismatch for BAT %d on line %d\n", (int) b->batCacheid, lineno);
 		return -1;
 	}
 	b->ttype = t;
@@ -553,7 +553,7 @@ heapinit(BAT *b, const char *buf,
 }
 
 static int
-vheapinit(BAT *b, const char *buf, bat bid, const char *filename, int lineno)
+vheapinit(BAT *b, const char *buf, const char *filename, int lineno)
 {
 	int n = 0;
 	uint64_t free, size;
@@ -567,11 +567,6 @@ vheapinit(BAT *b, const char *buf, bat b
 			TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno);
 			return -1;
 		}
-		b->tvheap = GDKmalloc(sizeof(Heap));
-		if (b->tvheap == NULL) {
-			TRC_CRITICAL(GDK, "cannot allocate memory for heap.");
-			return -1;
-		}
 		if (b->batCount == 0)
 			free = 0;
 		if (b->ttype >= 0 &&
@@ -590,16 +585,134 @@ vheapinit(BAT *b, const char *buf, bat b
 			.cleanhash = true,
 			.newstorage = STORE_INVALID,
 			.dirty = false,
-			.parentid = bid,
+			.parentid = b->batCacheid,
 			.farmid = BBPselectfarm(PERSISTENT, b->ttype, varheap),
 		};
 		strconcat_len(b->tvheap->filename, sizeof(b->tvheap->filename),
 			      filename, ".theap", NULL);
-		ATOMIC_INIT(&b->tvheap->refs, 1);
+	} else {
+		b->tvheap = NULL;
 	}
 	return n;
 }
 
+/* read a single line from the BBP.dir file (file pointer fp) and fill
+ * in the structure pointed to by bn and extra information through the
+ * other pointers; this function does not allocate any memory; return 0
+ * on end of file, 1 on success, and -1 on failure */
+static int
+BBPreadBBPline(FILE *fp, unsigned bbpversion, int *lineno, BAT *bn,
+#ifdef GDKLIBRARY_HASHASH
+	       int *hashash,
+#endif
+	       char *batname, char *filename, char **options)
+{
+	char buf[4096];
+	uint64_t batid;
+	uint16_t status;
+	unsigned int properties;
+	int nread, n;
+	char *s;
+	uint64_t count, capacity = 0, base = 0;
+
+	if (fgets(buf, sizeof(buf), fp) == NULL) {
+		if (ferror(fp)) {
+			TRC_CRITICAL(GDK, "error reading BBP.dir on line %d\n", *lineno);
+			return -1;
+		}
+		return 0;	/* end of file */
+	}
+	(*lineno)++;
+	if ((s = strchr(buf, '\r')) != NULL) {
+		/* convert \r\n into just \n */
+		if (s[1] != '\n') {
+			TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", *lineno);
+			return -1;
+		}
+		*s++ = '\n';
+		*s = 0;
+	}
+
+	if (sscanf(buf,
+		   "%" SCNu64 " %" SCNu16 " %128s %19s %u %" SCNu64
+		   " %" SCNu64 " %" SCNu64
+		   "%n",
+		   &batid, &status, batname, filename,
+		   &properties,
+		   &count, &capacity, &base,
+		   &nread) < 8) {
+		TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", *lineno);
+		return -1;
+	}
+
+	if (batid >= N_BBPINIT * BBPINIT) {
+		TRC_CRITICAL(GDK, "bat ID (%" PRIu64 ") too large to accomodate (max %d), on line %d.", batid, N_BBPINIT * BBPINIT - 1, *lineno);
+		return -1;
+	}
+
+	/* convert both / and \ path separators to our own DIR_SEP */
+#if DIR_SEP != '/'
+	s = filename;
+	while ((s = strchr(s, '/')) != NULL)
+		*s++ = DIR_SEP;
+#endif
+#if DIR_SEP != '\\'
+	s = filename;
+	while ((s = strchr(s, '\\')) != NULL)
+		*s++ = DIR_SEP;
+#endif
+
+	bn->batCacheid = (bat) batid;
+	BATinit_idents(bn);
+	bn->batTransient = false;
+	bn->batCopiedtodisk = true;
+	switch ((properties & 0x06) >> 1) {
+	case 0:
+		bn->batRestricted = BAT_WRITE;
+		break;
+	case 1:
+		bn->batRestricted = BAT_READ;
+		break;
+	case 2:
+		bn->batRestricted = BAT_APPEND;
+		break;
+	default:
+		TRC_CRITICAL(GDK, "incorrect batRestricted value");
+		return -1;
+	}
+	bn->batCount = (BUN) count;
+	bn->batInserted = bn->batCount;
+	/* set capacity to at least count */
+	bn->batCapacity = (BUN) count <= BATTINY ? BATTINY : (BUN) count;
+
+	if (base > (uint64_t) GDK_oid_max) {
+		TRC_CRITICAL(GDK, "head seqbase out of range (ID = %" PRIu64 ", seq = %" PRIu64 ") on line %d.", batid, base, *lineno);
+		return -1;
+	}
+	bn->hseqbase = (oid) base;
+	n = heapinit(bn, buf + nread,
+#ifdef GDKLIBRARY_HASHASH
+		     hashash,
+#endif
+		     bbpversion, filename, *lineno);
+	if (n < 0) {
+		return -1;
+	}
+	nread += n;
+	n = vheapinit(bn, buf + nread, filename, *lineno);
+	if (n < 0) {
+		return -1;
+	}
+	nread += n;
+
+	if (buf[nread] != '\n' && buf[nread] != ' ') {
+		TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", *lineno);
+		return -1;
+	}
+	*options = (buf[nread] == ' ') ? buf + nread + 1 : NULL;
+	return 1;
+}
+
 static gdk_return
 BBPreadEntries(FILE *fp, unsigned bbpversion, int lineno
 #ifdef GDKLIBRARY_HASHASH
@@ -607,101 +720,114 @@ BBPreadEntries(FILE *fp, unsigned bbpver
 #endif
 	)
 {
-	bat bid = 0;
-	char buf[4096];
 #ifdef GDKLIBRARY_HASHASH
 	bat *hbats = NULL;
 	bat nhbats = 0;
 #endif
 
 	/* read the BBP.dir and insert the BATs into the BBP */
-	while (fgets(buf, sizeof(buf), fp) != NULL) {
-		BAT *bn;
-		uint64_t batid;
-		uint16_t status;
+	for (;;) {
+		BAT b;
+		Heap h;
+		Heap vh;
+		vh = h = (Heap) {
+			.free = 0,
+		};
+		b = (BAT) {
+			.theap = &h,
+			.tvheap = &vh,
+		};
+		char *options;
 		char headname[129];
 		char filename[sizeof(BBP_physical(0))];
-		unsigned int properties;
-		int nread, n;
-		char *s, *options = NULL;
 		char logical[1024];
-		uint64_t count, capacity, base = 0;
 #ifdef GDKLIBRARY_HASHASH
 		int Thashash;
 #endif
 
-		lineno++;
-		if ((s = strchr(buf, '\r')) != NULL) {
-			/* convert \r\n into just \n */
-			if (s[1] != '\n') {
-				TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno);
-				goto bailout;
-			}
-			*s++ = '\n';
-			*s = 0;
-		}
-
-		if (sscanf(buf,
-			   "%" SCNu64 " %" SCNu16 " %128s %19s %u %" SCNu64
-			   " %" SCNu64 " %" SCNu64
-			   "%n",
-			   &batid, &status, headname, filename,
-			   &properties,
-			   &count, &capacity, &base,
-			   &nread) < 8) {
-			TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno);
+		switch (BBPreadBBPline(fp, bbpversion, &lineno, &b,
+#ifdef GDKLIBRARY_HASHASH
+				       &Thashash,
+#endif
+				       headname, filename, &options)) {
+		case 0:
+			/* end of file */
+#ifdef GDKLIBRARY_HASHASH
+			*hashbats = hbats;
+			*nhashbats = nhbats;
+#endif
+			return GDK_SUCCEED;
+		case 1:
+			/* successfully read an entry */
+			break;
+		default:
+			/* error */
 			goto bailout;
 		}
 
-		if (batid >= N_BBPINIT * BBPINIT) {
-			TRC_CRITICAL(GDK, "bat ID (%" PRIu64 ") too large to accomodate (max %d), on line %d.", batid, N_BBPINIT * BBPINIT - 1, lineno);
+		if (b.batCacheid >= N_BBPINIT * BBPINIT) {
+			TRC_CRITICAL(GDK, "bat ID (%d) too large to accommodate (max %d), on line %d.", b.batCacheid, N_BBPINIT * BBPINIT - 1, lineno);
 			goto bailout;
 		}
 
-		/* convert both / and \ path separators to our own DIR_SEP */
-#if DIR_SEP != '/'
-		s = filename;
-		while ((s = strchr(s, '/')) != NULL)
-			*s++ = DIR_SEP;
-#endif
-#if DIR_SEP != '\\'
-		s = filename;
-		while ((s = strchr(s, '\\')) != NULL)
-			*s++ = DIR_SEP;
-#endif
-
-		bid = (bat) batid;
-		if (batid >= (uint64_t) ATOMIC_GET(&BBPsize)) {
+		if (b.batCacheid >= (bat) ATOMIC_GET(&BBPsize)) {
 			if ((bat) ATOMIC_GET(&BBPsize) + 1 >= BBPlimit &&
-			    BBPextend(0, false, bid + 1) != GDK_SUCCEED)
+			    BBPextend(0, false, b.batCacheid + 1) != GDK_SUCCEED)
 				goto bailout;
-			ATOMIC_SET(&BBPsize, bid + 1);
+			ATOMIC_SET(&BBPsize, b.batCacheid + 1);
 		}
-		if (BBP_desc(bid) != NULL) {
+		if (BBP_desc(b.batCacheid) != NULL) {
 			TRC_CRITICAL(GDK, "duplicate entry in BBP.dir (ID = "
-				     "%" PRIu64 ") on line %d.", batid, lineno);
+				     "%d) on line %d.", b.batCacheid, lineno);
 			goto bailout;
 		}
+
+#ifdef GDKLIBRARY_HASHASH
+		if (Thashash) {
+			assert(bbpversion <= GDKLIBRARY_HASHASH);
+			bat *sb = GDKrealloc(hbats, ++nhbats * sizeof(bat));
+			if (sb == NULL) {
+				goto bailout;
+			}
+			hbats = sb;
+			hbats[nhbats - 1] = b.batCacheid;
+		}
+#endif
+
+		BAT *bn;
+		Heap *hn;
 		if ((bn = GDKzalloc(sizeof(BAT))) == NULL ||
-		    (bn->theap = GDKzalloc(sizeof(Heap))) == NULL) {
+		    (hn = GDKzalloc(sizeof(Heap))) == NULL) {
 			GDKfree(bn);
 			TRC_CRITICAL(GDK, "cannot allocate memory for BAT.");
 			goto bailout;
 		}
-		bn->batCacheid = bid;
-		if (BATroles(bn, NULL) != GDK_SUCCEED) {
-			GDKfree(bn->theap);
+		*bn = b;
+		*hn = h;
+		bn->theap = hn;
+		if (options &&
+		    (options = GDKstrdup(options)) == NULL) {
+			GDKfree(hn);
 			GDKfree(bn);
-			TRC_CRITICAL(GDK, "BATroles failed.");
+			PROPdestroy_nolock(&b);
+			TRC_CRITICAL(GDK, "GDKstrdup failed\n");
 			goto bailout;
 		}
-		bn->batTransient = false;
-		bn->batCopiedtodisk = true;
-		bn->batRestricted = (properties & 0x06) >> 1;
-		bn->batCount = (BUN) count;
-		bn->batInserted = bn->batCount;
-		/* set capacity to at least count */
-		bn->batCapacity = (BUN) count <= BATTINY ? BATTINY : (BUN) count;
+		if (b.tvheap) {
+			Heap *vhn;
+			assert(b.tvheap == &vh);
+			if ((vhn = GDKmalloc(sizeof(Heap))) == NULL) {
+				GDKfree(hn);
+				GDKfree(bn);
+				GDKfree(options);
+				TRC_CRITICAL(GDK, "cannot allocate memory for BAT.");
+				goto bailout;
+			}
+			*vhn = vh;
+			bn->tvheap = vhn;
+			ATOMIC_INIT(&bn->tvheap->refs, 1);
+		}
+
 		char name[MT_NAME_LEN];
 		snprintf(name, sizeof(name), "heaplock%d", bn->batCacheid); /* fits */
 		MT_lock_init(&bn->theaplock, name);
@@ -711,99 +837,43 @@ BBPreadEntries(FILE *fp, unsigned bbpver
 		MT_rwlock_init(&bn->thashlock, name);
 		ATOMIC_INIT(&bn->theap->refs, 1);
 
-		if (base > (uint64_t) GDK_oid_max) {
-			BATdestroy(bn);
-			TRC_CRITICAL(GDK, "head seqbase out of range (ID = %" PRIu64 ", seq = %" PRIu64 ") on line %d.", batid, base, lineno);
-			goto bailout;
-		}
-		bn->hseqbase = (oid) base;
-		n = heapinit(bn, buf + nread,
-#ifdef GDKLIBRARY_HASHASH
-			     &Thashash,
-#endif
-			     bbpversion, bid, filename, lineno);
-		if (n < 0) {
-			BATdestroy(bn);
-			goto bailout;
-		}
-		nread += n;
-		n = vheapinit(bn, buf + nread, bid, filename, lineno);
-		if (n < 0) {
-			BATdestroy(bn);
-			goto bailout;
-		}
-		nread += n;
-#ifdef GDKLIBRARY_HASHASH
-		if (Thashash) {
-			assert(bbpversion <= GDKLIBRARY_HASHASH);
-			bat *sb = GDKrealloc(hbats, ++nhbats * sizeof(bat));
-			if (sb == NULL) {
-				BATdestroy(bn);
-				goto bailout;
-			}
-			hbats = sb;
-			hbats[nhbats - 1] = bn->batCacheid;
-		}
-#endif
-
-		if (buf[nread] != '\n' && buf[nread] != ' ') {
-			BATdestroy(bn);
-			TRC_CRITICAL(GDK, "invalid format for BBP.dir on line %d", lineno);
-			goto bailout;
-		}
-		if (buf[nread] == ' ')
-			options = buf + nread + 1;
-
-		if (snprintf(BBP_bak(bid), sizeof(BBP_bak(bid)), "tmp_%o", (unsigned) bid) >= (int) sizeof(BBP_bak(bid))) {
+		if (snprintf(BBP_bak(b.batCacheid), sizeof(BBP_bak(b.batCacheid)), "tmp_%o", (unsigned) b.batCacheid) >= (int) sizeof(BBP_bak(b.batCacheid))) {
 			BATdestroy(bn);
 			TRC_CRITICAL(GDK, "BBP logical filename directory is too large, on line %d\n", lineno);
 			goto bailout;
 		}
+		char *s;
 		if ((s = strchr(headname, '~')) != NULL && s == headname) {
-			/* sizeof(logical) > sizeof(BBP_bak(bid)), so
+			/* sizeof(logical) > sizeof(BBP_bak(b.batCacheid)), so
 			 * this fits */
-			strcpy(logical, BBP_bak(bid));
+			strcpy(logical, BBP_bak(b.batCacheid));
 		} else {
 			if (s)
 				*s = 0;
 			strcpy_len(logical, headname, sizeof(logical));
 		}
-		if (strcmp(logical, BBP_bak(bid)) == 0) {
-			BBP_logical(bid) = BBP_bak(bid);
+		if (strcmp(logical, BBP_bak(b.batCacheid)) == 0) {
+			BBP_logical(b.batCacheid) = BBP_bak(b.batCacheid);
 		} else {
-			BBP_logical(bid) = GDKstrdup(logical);
-			if (BBP_logical(bid) == NULL) {
+			BBP_logical(b.batCacheid) = GDKstrdup(logical);
+			if (BBP_logical(b.batCacheid) == NULL) {
 				BATdestroy(bn);
 				TRC_CRITICAL(GDK, "GDKstrdup failed\n");
 				goto bailout;
 			}
 		}
-		/* tailname is ignored */
-		strcpy_len(BBP_physical(bid), filename, sizeof(BBP_physical(bid)));
+		strcpy_len(BBP_physical(b.batCacheid), filename, sizeof(BBP_physical(b.batCacheid)));
 #ifdef __COVERITY__
 		/* help coverity */
-		BBP_physical(bid)[sizeof(BBP_physical(bid)) - 1] = 0;
+		BBP_physical(b.batCacheid)[sizeof(BBP_physical(b.batCacheid)) - 1] = 0;
 #endif
-		BBP_options(bid) = NULL;
-		if (options) {
-			BBP_options(bid) = GDKstrdup(options);
-			if (BBP_options(bid) == NULL) {
-				BATdestroy(bn);
-				TRC_CRITICAL(GDK, "GDKstrdup failed\n");
-				goto bailout;
-			}
-		}
-		BBP_refs(bid) = 0;
-		BBP_lrefs(bid) = 1;	/* any BAT we encounter here is persistent, so has a logical reference */
-		BBP_desc(bid) = bn;
-		BBP_pid(bid) = 0;
-		BBP_status_set(bid, BBPEXISTING);	/* do we need other status bits? */
+		BBP_options(b.batCacheid) = options;
+		BBP_refs(b.batCacheid) = 0;
+		BBP_lrefs(b.batCacheid) = 1;	/* any BAT we encounter here is persistent, so has a logical reference */
+		BBP_desc(b.batCacheid) = bn;
+		BBP_pid(b.batCacheid) = 0;
+		BBP_status_set(b.batCacheid, BBPEXISTING);	/* do we need other status bits? */
 	}
-#ifdef GDKLIBRARY_HASHASH
-	*hashbats = hbats;
-	*nhashbats = nhbats;
-#endif
-	return GDK_SUCCEED;
 
   bailout:
 #ifdef GDKLIBRARY_HASHASH
@@ -928,7 +998,7 @@ BBPcheckbats(unsigned bbpversion)
 #endif
 
 static unsigned
-BBPheader(FILE *fp, int *lineno, bat *bbpsize)
+BBPheader(FILE *fp, int *lineno, bat *bbpsize, lng *logno, lng *transid)
 {
 	char buf[BUFSIZ];
 	int sz, ptrsize, oidsize, intsize;
@@ -992,10 +1062,12 @@ BBPheader(FILE *fp, int *lineno, bat *bb
 			TRC_CRITICAL(GDK, "short BBP");
 			return 0;
 		}
-		if (sscanf(buf, "BBPinfo=" LLSCN " " LLSCN, &BBPlogno, &BBPtransid) != 2) {
+		if (sscanf(buf, "BBPinfo=" LLSCN " " LLSCN, logno, transid) != 2) {
 			TRC_CRITICAL(GDK, "no info value found\n");
 			return 0;
 		}
+	} else {
+		*logno = *transid = 0;
 	}
 	return bbpversion;
 }
@@ -1582,11 +1654,16 @@ BBPinit(bool first)
 	if (GDKinmemory(0)) {
 		bbpversion = GDKLIBRARY;
 	} else {
-		bbpversion = BBPheader(fp, &lineno, &bbpsize);
+		lng logno, transid;
+		bbpversion = BBPheader(fp, &lineno, &bbpsize, &logno, &transid);
 		if (bbpversion == 0) {
 			GDKdebug = dbg;
 			return GDK_FAIL;
 		}
+		assert(bbpversion > GDKLIBRARY_MINMAX_POS || logno == 0);
+		assert(bbpversion > GDKLIBRARY_MINMAX_POS || transid == 0);
+		ATOMIC_SET(&BBPlogno, logno);
+		ATOMIC_SET(&BBPtransid, transid);
 	}
 
 	/* allocate BBP records */
@@ -1839,22 +1916,6 @@ heap_entry(FILE *fp, BATiter *bi, BUN si
 			free = 0;
 	}
 
-	if ((GDKdebug & TAILCHKMASK) && free > 0) {
-		char *fname = GDKfilepath(0, BATDIR, BBP_physical(b->batCacheid), gettailnamebi(bi));
-		if (fname != NULL) {
-			struct stat stb;
-			if (stat(fname, &stb) == -1) {
-				assert(0);
-				TRC_WARNING(GDK, "file %s not found (expected size %zu)\n", fname, free);
-			} else {
-				assert((size_t) stb.st_size >= free);
-				if ((size_t) stb.st_size < free)
-					TRC_WARNING(GDK, "file %s too small (expected %zu, actual %zu)\n", fname, free, (size_t) stb.st_size);
-			}
-			GDKfree(fname);
-		}
-	}
-
 	return fprintf(fp, " %s %d %d %d " BUNFMT " " BUNFMT " " BUNFMT " "
 		       BUNFMT " " OIDFMT " %zu %zu %d %" PRIu64" %" PRIu64,
 		       bi->type >= 0 ? BATatoms[bi->type].name : ATOMunknown_name(bi->type),
@@ -1884,21 +1945,7 @@ vheap_entry(FILE *fp, BATiter *bi, BUN s
 	(void) size;
 	if (bi->vh == NULL)
 		return 0;
-	if ((GDKdebug & TAILCHKMASK) && size > 0) {
-		char *fname = GDKfilepath(0, BATDIR, BBP_physical(bi->vh->parentid), "theap");
-		if (fname != NULL) {
-			struct stat stb;
-			if (stat(fname, &stb) == -1) {
-				assert(0);
-				TRC_WARNING(GDK, "file %s not found (expected size %zu)\n", fname, bi->vhfree);
-			} else if ((size_t) stb.st_size < bi->vhfree) {
-				/* no assert since this can actually happen */
-				TRC_WARNING(GDK, "file %s too small (expected %zu, actual %zu)\n", fname, bi->vhfree, (size_t) stb.st_size);
-			}
-			GDKfree(fname);
-		}
-	}
-	return fprintf(fp, " %zu %zu %d", bi->vhfree, size == 0 ? 0 : bi->vh->size, 0);
+	return fprintf(fp, " %zu %zu %d", size == 0 ? 0 : bi->vhfree, bi->vh->size, 0);
 }
 
 static gdk_return
@@ -2030,112 +2077,12 @@ BBPdir_first(bool subcommit, lng logno, 
 
 static bat
 BBPdir_step(bat bid, BUN size, int n, char *buf, size_t bufsize,
-	    FILE **obbpfp, FILE *nbbpf, bool subcommit, BATiter *bi)
+	    FILE **obbpfp, FILE *nbbpf, BATiter *bi)
 {
 	if (n < -1)		/* safety catch */
 		return n;
 	while (n >= 0 && n < bid) {
 		if (n > 0) {
-			if (GDKdebug & TAILCHKMASK) {
-				uint64_t batid, free, vfree;
-				char filename[sizeof(BBP_physical(0))];
-				char type[33];
-				uint16_t width;
-				char *fname;
-				struct stat stb;
-				switch (sscanf(buf, "%" SCNu64 " %*u %*s %19s %*u %*u %*u %*u %10s %" SCNu16 " %*u %*u %*u %*u %*u %*u %*u %" SCNu64 " %*u %*u %*u %*u %" SCNu64 " %*u %*u",
-					       &batid, filename, type, &width, &free, &vfree)) {
-				case 5:
-					vfree = 0;
-					/* fall through */
-				case 6:
-					assert(batid == (uint64_t) n);
-					if (free == 0)
-						break;
-					const char *tailname = "tail";
-					if (strcmp(type, "str") == 0) {
-						switch (width) {
-						case 1:
-							tailname = "tail1";
-							break;
-						case 2:
-							tailname = "tail2";
-							break;
-#if SIZEOF_VAR_T == 8
-						case 4:
-							tailname = "tail4";
-							break;
-#endif
-						}
-					}
-					if (subcommit) {
-						char base[32];
-						snprintf(base, sizeof(base), "%" PRIo64, batid);
-						fname = GDKfilepath(0, BAKDIR, base, tailname);
-					} else {
-						fname = GDKfilepath(0, BATDIR, filename, tailname);
-					}
-					if (fname == NULL)
-						break;
-					bool found = true;
-					if (stat(fname, &stb) == -1) {
-						if (subcommit) {
-							char *fname1 = GDKfilepath(0, BATDIR, filename, tailname);
-							if (fname1 == NULL) {
-								GDKfree(fname);
-								break;
-							}
-							if (stat(fname1, &stb) == -1) {
-								assert(0);
-								found = false;
-								GDKfree(fname1);
-							} else {
-								GDKfree(fname);
-								fname = fname1;
-							}
-						} else {
-							assert(0);
-							found = false;
-						}
-					}
-					if (!found) {
-						TRC_WARNING(GDK, "file %s not found (expected size %" PRIu64 ")\n", fname, free);
-					} else {
-						assert((uint64_t) stb.st_size >= free);
-						if ((uint64_t) stb.st_size < free)
-							TRC_WARNING(GDK, "file %s too small (expected %" PRIu64 ", actual %zu)\n", fname, free, (size_t) stb.st_size);
-					}
-					GDKfree(fname);
-					if (vfree == 0)
-						break;
-					if (subcommit) {
-						char base[32];
-						snprintf(base, sizeof(base), "%" PRIo64, batid);
-						fname = GDKfilepath(0, BAKDIR, base, "theap");
-					} else {
-						fname = GDKfilepath(0, BATDIR, filename, "theap");
-					}
-					if (fname == NULL)
-						break;
-					if (stat(fname, &stb) == -1) {
-						if (subcommit) {
-							GDKfree(fname);
-							fname = GDKfilepath(0, BATDIR, filename, "theap");
-							if (fname == NULL)
-								break;
-							if (stat(fname, &stb) == -1)
-								assert(0);
-						} else {
-							assert(0);
-						}
-					}
-					assert((uint64_t) stb.st_size >= vfree);
-					if ((uint64_t) stb.st_size < vfree)
-						TRC_WARNING(GDK, "file %s too small (expected %" PRIu64 ", actual %zu)\n", fname, vfree, (size_t) stb.st_size);
-					GDKfree(fname);
-					break;
-				}
-			}
 			if (fputs(buf, nbbpf) == EOF) {
 				GDKerror("Writing BBP.dir file failed.\n");
 				goto bailout;
@@ -3834,6 +3781,128 @@ BBPbackup(BAT *b, bool subcommit)
 	return GDK_FAIL;
 }
 
+static inline void
+BBPcheckHeap(bool subcommit, Heap *h)
+{
+	struct stat statb;
+	char *path;
+
+	if (subcommit) {
+		char *s = strrchr(h->filename, DIR_SEP);
+		if (s)
+			s++;
+		else
+			s = h->filename;
+		path = GDKfilepath(0, BAKDIR, s, NULL);
+		if (path == NULL)
+			return;
+		if (MT_stat(path, &statb) < 0) {
+			GDKfree(path);
+			path = GDKfilepath(0, BATDIR, h->filename, NULL);
+			if (path == NULL)
+				return;
+			if (MT_stat(path, &statb) < 0) {
+				assert(0);
+				GDKsyserror("cannot stat file %s (expected size %zu)\n",
+					    path, h->free);
+				GDKfree(path);
+				return;
+			}
+		}
+	} else {
+		path = GDKfilepath(0, BATDIR, h->filename, NULL);
+		if (path == NULL)
+			return;
+		if (MT_stat(path, &statb) < 0) {
+			assert(0);
+			GDKsyserror("cannot stat file %s (expected size %zu)\n",
+				    path, h->free);
+			GDKfree(path);
+			return;
+		}
+	}
+	assert((statb.st_mode & S_IFMT) == S_IFREG);
+	assert((size_t) statb.st_size >= h->free);
+	if ((size_t) statb.st_size < h->free) {
+		GDKerror("file %s too small (expected %zu, actual %zu)\n", path, h->free, (size_t) statb.st_size);
+		GDKfree(path);
+		return;
+	}
+	GDKfree(path);
+}
+
+static void
+BBPcheckBBPdir(bool subcommit)
+{
+	FILE *fp;
+	int lineno = 0;
+	bat bbpsize = 0;
+	unsigned bbpversion;
+	lng logno, transid;
+
+	fp = GDKfileopen(0, BATDIR, "BBP", "dir", "r");
+	assert(fp != NULL);
+	if (fp == NULL)
+		return;
+	bbpversion = BBPheader(fp, &lineno, &bbpsize, &logno, &transid);
+	if (bbpversion == 0) {
+		fclose(fp);
+		return;		/* error reading file */
+	}
+	assert(bbpversion == GDKLIBRARY);
+
+	for (;;) {
+		BAT b;
+		Heap h;
+		Heap vh;
+		vh = h = (Heap) {
+			.free = 0,
+		};
+		b = (BAT) {
+			.theap = &h,
+			.tvheap = &vh,
+		};
+		char *options;
+		char filename[sizeof(BBP_physical(0))];
+		char batname[129];
+#ifdef GDKLIBRARY_HASHASH
+		int hashash;
+#endif
+
+		switch (BBPreadBBPline(fp, bbpversion, &lineno, &b,
+#ifdef GDKLIBRARY_HASHASH
+				       &hashash,
+#endif
+				       batname, filename, &options)) {
+		case 0:
+			/* end of file */
+			fclose(fp);
+			return;
+		case 1:
+			/* successfully read an entry */
+			break;
+		default:
+			/* error */
+			fclose(fp);
+			return;
+		}
+#ifdef GDKLIBRARY_HASHASH
+		assert(hashash == 0);
+#endif
+		assert(b.batCacheid < (bat) ATOMIC_GET(&BBPsize));
+		assert(BBP_desc(b.batCacheid) != NULL);
+		assert(b.hseqbase <= GDK_oid_max);
+		if (b.ttype == TYPE_void) {
+			/* no files needed */
+			continue;
+		}
+		if (b.theap->free > 0)
+			BBPcheckHeap(subcommit, b.theap);
+		if (b.tvheap != NULL && b.tvheap->free > 0)
+			BBPcheckHeap(subcommit, b.tvheap);
+	}
+}
+
 /*
  * @+ Atomic Write
  * The atomic BBPsync() function first safeguards the old images of
@@ -3989,7 +4058,7 @@ BBPsync(int cnt, bat *restrict subcommit
 				bi = bat_iterator(NULL);
 			}
 			if (ret == GDK_SUCCEED) {
-				n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf, nbbpf, subcommit != NULL, &bi);
+				n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf, nbbpf, &bi);
 			}
 			bat_iterator_end(&bi);
 			if (n == -2)
@@ -4016,6 +4085,9 @@ BBPsync(int cnt, bat *restrict subcommit
 		 * succeeded, so no changing of ret after this
 		 * call anymore */
 
+		if ((GDKdebug & TAILCHKMASK) && !GDKinmemory(0))
+			BBPcheckBBPdir(subcommit != NULL);
+
 		if (MT_rename(bakdir, deldir) < 0 &&
 		    /* maybe there was an old deldir, so remove and try again */
 		    (GDKremovedir(0, DELDIR) != GDK_SUCCEED ||
--- a/gdk/gdk_calc.c
+++ b/gdk/gdk_calc.c
@@ -15074,7 +15074,6 @@ BATcalcifthenelse_intern(BAT *b,
 	bat_iterator_end(&bi);
 
 	BATsetcount(bn, cnt);
-	bn->theap->dirty = true;
 
 	bn->tsorted = cnt <= 1;
 	bn->trevsorted = cnt <= 1;
@@ -16016,7 +16015,6 @@ convert_any_str(BAT *b, BAT *bn, struct 
 		}
 	}
 	bat_iterator_end(&bi);
-	bn->theap->dirty = true;
 	BATsetcount(bn, ncand);
 	GDKfree(dst);
 	return nils;
--- a/gdk/gdk_delta.c
+++ b/gdk/gdk_delta.c
@@ -99,8 +99,6 @@ BATundo(BAT *b)
 			}
 		}
 	}
-	b->theap->free = tailsize(b, b->batInserted);
-
 	BATsetcount(b, b->batInserted);
 	MT_lock_unset(&b->theaplock);
 }
--- a/gdk/gdk_hash.c
+++ b/gdk/gdk_hash.c
@@ -143,11 +143,13 @@ HASHnew(Hash *h, int tpe, BUN size, BUN 
 		if (HEAPalloc(&h->heaplink, size, h->width, 0) != GDK_SUCCEED)
 			return GDK_FAIL;
 		h->heaplink.free = size * h->width;
+		h->heaplink.dirty = true;
 		h->Link = h->heaplink.base;
 	}
 	if (HEAPalloc(&h->heapbckt, mask + HASH_HEADER_SIZE * SIZEOF_SIZE_T / h->width, h->width, 0) != GDK_SUCCEED)
 		return GDK_FAIL;
 	h->heapbckt.free = mask * h->width + HASH_HEADER_SIZE * SIZEOF_SIZE_T;
+	h->heapbckt.dirty = true;
 	h->nbucket = mask;
 	if (mask & (mask - 1)) {
 		h->mask2 = hashmask(mask);
@@ -239,6 +241,8 @@ HASHupgradehashheap(BAT *b)
 				BUN2type v = ((BUN2type *) h->Bckt)[i];
 				((BUN4type *) h->Bckt)[i] = v == BUN2_NONE ? BUN4_NONE : v;
 			}
+			h->heapbckt.dirty = true;
+			h->heaplink.dirty = true;
 			break;
 		}
 #endif
@@ -262,6 +266,8 @@ HASHupgradehashheap(BAT *b)
 				BUN2type v = ((BUN2type *) h->Bckt)[i];
 				((BUN8type *) h->Bckt)[i] = v == BUN2_NONE ? BUN8_NONE : v;
 			}
+			h->heapbckt.dirty = true;
+			h->heaplink.dirty = true;
 			break;
 #endif
 		case BUN4:
@@ -279,6 +285,8 @@ HASHupgradehashheap(BAT *b)
 				BUN4type v = ((BUN4type *) h->Bckt)[i];
 				((BUN8type *) h->Bckt)[i] = v == BUN4_NONE ? BUN8_NONE : v;
 			}
+			h->heapbckt.dirty = true;
+			h->heaplink.dirty = true;
 			break;
 		}
 		break;
--- a/gdk/gdk_heap.c
+++ b/gdk/gdk_heap.c
@@ -1037,6 +1037,8 @@ HEAP_empty(Heap *heap, size_t nprivate, 
 	assert(heap->size - head <= VAR_MAX);
 	headp->size = (size_t) (heap->size - head);
 	headp->next = 0;
+
+	heap->dirty = true;
 }
 
 gdk_return
@@ -1123,6 +1125,7 @@ HEAP_malloc(BAT *b, size_t nbytes)
 		}
 		heap = b->tvheap;
 		heap->free = newsize;
+		heap->dirty = true;
 		hheader = HEAP_index(heap, 0, HEADER);
 
 		blockp = HEAP_index(heap, block, CHUNK);
@@ -1361,6 +1364,8 @@ HEAP_recover(Heap *h, const var_t *offse
 		if (h->storage == STORE_MMAP) {
 			if (!(GDKdebug & NOSYNCMASK))
 				(void) MT_msync(h->base, dirty);
+			else
+				h->dirty = true;
 		} else
 			h->dirty = true;
 	}
--- a/gdk/gdk_imprints.c
+++ b/gdk/gdk_imprints.c
@@ -629,6 +629,7 @@ BATimprints(BAT *b)
 		((size_t *) imprints->imprints.base)[2] = (size_t) imprints->dictcnt;
 		((size_t *) imprints->imprints.base)[3] = (size_t) bi.count;
 		imprints->imprints.parentid = b->batCacheid;
+		imprints->imprints.dirty = true;
 		MT_lock_set(&b->theaplock);
 		if (b->batCount != bi.count) {
 			/* bat changed under our feet, can't use imprints */
--- a/gdk/gdk_orderidx.c
+++ b/gdk/gdk_orderidx.c
@@ -150,6 +150,7 @@ createOIDXheap(BAT *b, bool stable)
 		return NULL;
 	}
 	m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
+	m->dirty = true;
 
 	mv = (oid *) m->base;
 	*mv++ = ORDERIDX_VERSION;
@@ -376,6 +377,7 @@ GDKmergeidx(BAT *b, BAT**a, int n_ar)
 		return GDK_FAIL;
 	}
 	m->free = (BATcount(b) + ORDERIDXOFF) * SIZEOF_OID;
+	m->dirty = true;
 
 	mv = (oid *) m->base;
 	*mv++ = ORDERIDX_VERSION;
--- a/gdk/gdk_project.c
+++ b/gdk/gdk_project.c
@@ -417,6 +417,7 @@ project_str(BAT *restrict l, struct cand
 #endif
 		memcpy(bn->tvheap->base + h1off, r2i->vh->base, r2i->vhfree);
 		bn->tvheap->free = h1off + r2i->vhfree;
+		bn->tvheap->dirty = true;
 	}
 
 	if (v >= ((var_t) 1 << (8 << bn->tshift)) &&
--- a/monetdb5/modules/kernel/batstr.c
+++ b/monetdb5/modules/kernel/batstr.c
@@ -39,7 +39,7 @@ batstr_func_has_candidates(const char *f
 	return true;
 }
 
-static void
+static inline void
 finalize_ouput(bat *res, BAT *bn, str msg, bool nils, BUN q)
 {
 	if (bn && !msg) {
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -941,7 +941,6 @@ SQLworker_column(READERtask *task, int c
 		}
 	}
 	BATsetcount(fmt[col].c, BATcount(fmt[col].c));
-	fmt[col].c->theap->dirty |= BATcount(fmt[col].c) > 0;
 
 	return 0;
 }