LCOV - code coverage report
Current view: top level - gdk - gdk_logger_old.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 1139 0.0 %
Date: 2024-04-25 20:03:45 Functions: 0 37 0.0 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : /*
      14             :  * (author) N. J. Nes
      15             :  *
      16             :  * In the philosophy of MonetDB, transaction management overhead
      17             :  * should only be paid when necessary. Transaction management is for
      18             :  * this purpose implemented as a separate module and applications are
      19             :  * required to obey the transaction policy, e.g. obtaining/releasing
      20             :  * locks.
      21             :  *
      22             :  * This module is designed to support efficient logging of the SQL
      23             :  * database.  Once loaded, the SQL compiler will insert the proper
      24             :  * calls at transaction commit to include the changes in the log file.
      25             :  *
      26             :  * The logger uses a directory to store its log files. One master log
      27             :  * file stores information about the version of the logger and the
      28             :  * transaction log files. This file is a simple ascii file with the
      29             :  * following format:
      30             :  *  {6DIGIT-VERSION\n[log file number \n]*]*}
      31             :  * The transaction log files have a binary format, which stores fixed
      32             :  * size logformat headers (flag,nr,bid), where the flag is the type of
      33             :  * update logged.  The nr field indicates how many changes there were
      34             :  * (in case of inserts/deletes).  The bid stores the bid identifier.
      35             :  *
      36             :  * The key decision to be made by the user is the location of the log
      37             :  * file.  Ideally, it should be stored in fail-safe environment, or at
      38             :  * least the log and databases should be on separate disk columns.
      39             :  *
      40             :  * This file system may reside on the same hardware as the database
      41             :  * server and therefore the writes are done to the same disk, but
      42             :  * could also reside on another system and then the changes are
      43             :  * flushed through the network.  The logger works under the assumption
      44             :  * that it is called to safeguard updates on the database when it has
      45             :  * an exclusive lock on the latest version. This lock should be
      46             :  * guaranteed by the calling transaction manager first.
      47             :  *
      48             :  * Finding the updates applied to a BAT is relatively easy, because
      49             :  * each BAT contains a delta structure. On commit these changes are
      50             :  * written to the log file and the delta management is reset. Since
      51             :  * each commit is written to the same log file, the beginning and end
      52             :  * are marked by a log identifier.
      53             :  *
      54             :  * A server restart should only (re)process blocks which are
      55             :  * completely written to disk. A log replay therefore ends in a commit
      56             :  * or abort on the changed bats. Once all logs have been read, the
      57             :  * changes to the bats are made persistent, i.e. a bbp sub-commit is
      58             :  * done.
      59             :  */
      60             : #include "monetdb_config.h"
      61             : #include "gdk.h"
      62             : #include "gdk_private.h"
      63             : #include "gdk_logger.h"
      64             : #include "gdk_logger_internals.h"
      65             : #include "mutils.h"
      66             : #include <string.h>
      67             : 
      68             : /*
      69             :  * The log record encoding is geared at reduced storage space, but at
      70             :  * the expense of readability. A user can not easily inspect the log a
      71             :  * posteriori to check what has happened.
      72             :  *
      73             :  */
      74             : #define LOG_START       1
      75             : #define LOG_END         2
      76             : #define LOG_INSERT      3
      77             : #define LOG_UPDATE      5
      78             : #define LOG_CREATE      6
      79             : #define LOG_DESTROY     7
      80             : #define LOG_USE         8
      81             : #define LOG_CLEAR       9
      82             : #define LOG_SEQ         10
      83             : #define LOG_INSERT_ID   11
      84             : #define LOG_UPDATE_ID   12
      85             : #define LOG_CREATE_ID   13
      86             : #define LOG_DESTROY_ID  14
      87             : #define LOG_USE_ID      15
      88             : #define LOG_CLEAR_ID    16
      89             : #define LOG_UPDATE_PAX  17
      90             : 
      91             : #ifdef NATIVE_WIN32
      92             : #define getfilepos _ftelli64
      93             : #else
      94             : #ifdef HAVE_FSEEKO
      95             : #define getfilepos ftello
      96             : #else
      97             : #define getfilepos ftell
      98             : #endif
      99             : #endif
     100             : 
     101             : #define BATSIZE 0
     102             : 
     103             : #define NAME(name,tpe,id) (name?name:"tpe id")
     104             : 
     105             : #define LOG_DISABLED(lg) ((lg)->lg->debug&128)
     106             : 
     107             : static gdk_return logger_cleanup(old_logger *lg);
     108             : static gdk_return logger_add_bat(old_logger *lg, BAT *b, const char *name, char tpe, oid id);
     109             : static gdk_return logger_del_bat(old_logger *lg, log_bid bid);
     110             : 
     111             : static const char *log_commands[] = {
     112             :         NULL,
     113             :         "LOG_START",
     114             :         "LOG_END",
     115             :         "LOG_INSERT",
     116             :         "LOG_DELETE",
     117             :         "LOG_UPDATE",
     118             :         "LOG_CREATE",
     119             :         "LOG_DESTROY",
     120             :         "LOG_USE",
     121             :         "LOG_CLEAR",
     122             :         "LOG_SEQ",
     123             :         "LOG_INSERT_ID",
     124             :         "LOG_DELETE_ID",
     125             :         "LOG_UPDATE_ID",
     126             :         "LOG_CREATE_ID",
     127             :         "LOG_DESTROY_ID",
     128             :         "LOG_USE_ID",
     129             :         "LOG_CLEAR_ID",
     130             :         "LOG_UPDATE_PAX",
     131             : };
     132             : 
     133             : typedef struct logaction {
     134             :         int type;               /* type of change */
     135             :         lng nr;
     136             :         int ht;                 /* vid(-1),void etc */
     137             :         int tt;
     138             :         lng id;
     139             :         char *name;             /* optional */
     140             :         char tpe;               /* tpe of column */
     141             :         oid cid;                /* id of object */
     142             :         BAT *b;                 /* temporary bat with changes */
     143             :         BAT *uid;               /* temporary bat with bun positions to update */
     144             : } logaction;
     145             : 
     146             : /* during the recover process a number of transactions could be active */
     147             : typedef struct trans {
     148             :         int tid;                /* transaction id */
     149             :         int sz;                 /* sz of the changes array */
     150             :         int nr;                 /* nr of changes */
     151             : 
     152             :         logaction *changes;
     153             : 
     154             :         struct trans *tr;
     155             : } trans;
     156             : 
     157             : typedef struct logformat_t {
     158             :         char flag;
     159             :         int tid;
     160             :         lng nr;
     161             : } logformat;
     162             : 
     163             : typedef enum {LOG_OK, LOG_EOF, LOG_ERR} log_return;
     164             : 
     165             : static gdk_return tr_grow(trans *tr);
     166             : 
     167             : static BUN
     168           0 : log_find(BAT *b, BAT *d, int val)
     169             : {
     170           0 :         BATiter cni = bat_iterator_nolock(b);
     171           0 :         BUN p;
     172             : 
     173           0 :         assert(b->ttype == TYPE_int);
     174           0 :         assert(d->ttype == TYPE_oid);
     175           0 :         if (BAThash(b) == GDK_SUCCEED) {
     176           0 :                 MT_rwlock_rdlock(&cni.b->thashlock);
     177           0 :                 HASHloop_int(cni, cni.b->thash, p, &val) {
     178           0 :                         oid pos = p;
     179           0 :                         if (BUNfnd(d, &pos) == BUN_NONE) {
     180           0 :                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     181           0 :                                 return p;
     182             :                         }
     183             :                 }
     184           0 :                 MT_rwlock_rdunlock(&cni.b->thashlock);
     185             :         } else {                /* unlikely: BAThash failed */
     186           0 :                 BUN q;
     187           0 :                 int *t = (int *) Tloc(b, 0);
     188             : 
     189           0 :                 for (p = 0, q = BATcount(b); p < q; p++) {
     190           0 :                         if (t[p] == val) {
     191           0 :                                 oid pos = p;
     192           0 :                                 if (BUNfnd(d, &pos) == BUN_NONE) {
     193           0 :                                         return p;
     194             :                                 }
     195             :                         }
     196             :                 }
     197             :         }
     198             :         return BUN_NONE;
     199             : }
     200             : 
     201             : static inline void
     202           0 : logbat_destroy(BAT *b)
     203             : {
     204           0 :         BBPreclaim(b);
     205           0 : }
     206             : 
     207             : static BAT *
     208           0 : logbat_new(int tt, BUN size, role_t role)
     209             : {
     210           0 :         BAT *nb = COLnew(0, tt, size, role);
     211             : 
     212           0 :         if (nb) {
     213           0 :                 if (role == PERSISTENT)
     214           0 :                         BATmode(nb, false);
     215             :         } else {
     216           0 :                 TRC_CRITICAL(GDK, "creating new BAT[%s]#" BUNFMT " failed\n", ATOMname(tt), size);
     217             :         }
     218           0 :         return nb;
     219             : }
     220             : 
     221             : static int
     222           0 : log_read_format(old_logger *l, logformat *data)
     223             : {
     224           0 :         return mnstr_read(l->log, &data->flag, 1, 1) == 1 &&
     225           0 :                 mnstr_readLng(l->log, &data->nr) == 1 &&
     226           0 :                 mnstr_readInt(l->log, &data->tid) == 1;
     227             : }
     228             : 
     229             : static char *
     230           0 : log_read_string(old_logger *l)
     231             : {
     232           0 :         int len;
     233           0 :         ssize_t nr;
     234           0 :         char *buf;
     235             : 
     236           0 :         if (mnstr_readInt(l->log, &len) != 1) {
     237           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     238             : //MK This leads to non-repeatable log structure?
     239           0 :                 return NULL;
     240             :         }
     241           0 :         if (len == 0)
     242             :                 return NULL;
     243           0 :         buf = GDKmalloc(len);
     244           0 :         if (buf == NULL) {
     245           0 :                 TRC_CRITICAL(GDK, "malloc failed\n");
     246             :                 /* this is bad */
     247           0 :                 return (char *) -1;
     248             :         }
     249             : 
     250           0 :         if ((nr = mnstr_read(l->log, buf, 1, len)) != (ssize_t) len) {
     251           0 :                 buf[len - 1] = 0;
     252           0 :                 TRC_CRITICAL(GDK, "couldn't read name (%s) %zd\n", buf, nr);
     253           0 :                 GDKfree(buf);
     254           0 :                 return NULL;
     255             :         }
     256           0 :         buf[len - 1] = 0;
     257           0 :         return buf;
     258             : }
     259             : 
     260             : static log_return
     261           0 : log_read_clear(old_logger *lg, trans *tr, char *name, char tpe, oid id)
     262             : {
     263           0 :         if (lg->lg->debug & 1)
     264           0 :                 fprintf(stderr, "#logger found log_read_clear %s\n", NAME(name, tpe, id));
     265           0 :         if (tr_grow(tr) != GDK_SUCCEED)
     266             :                 return LOG_ERR;
     267           0 :         tr->changes[tr->nr].type = LOG_CLEAR;
     268           0 :         tr->changes[tr->nr].tpe = tpe;
     269           0 :         tr->changes[tr->nr].cid = id;
     270           0 :         if (name && (tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
     271             :                 return LOG_ERR;
     272           0 :         tr->nr++;
     273           0 :         return LOG_OK;
     274             : }
     275             : 
     276             : static int
     277           0 : avoid_snapshot(old_logger *lg, log_bid bid)
     278             : {
     279           0 :         if (BATcount(lg->snapshots_bid)-BATcount(lg->dsnapshots)) {
     280           0 :                 BUN p = log_find(lg->snapshots_bid, lg->dsnapshots, bid);
     281             : 
     282           0 :                 if (p != BUN_NONE) {
     283           0 :                         int tid = *(int *) Tloc(lg->snapshots_tid, p);
     284             : 
     285           0 :                         if (lg->tid <= tid)
     286             :                                 return 1;
     287             :                 }
     288             :         }
     289             :         return 0;
     290             : }
     291             : 
     292             : log_bid
     293           0 : old_logger_find_bat(old_logger *lg, const char *name, char tpe, oid id)
     294             : {
     295           0 :         if (!tpe || !lg->with_ids) {
     296           0 :                 BATiter cni = bat_iterator_nolock(lg->catalog_nme);
     297           0 :                 BUN p;
     298             : 
     299           0 :                 assert(name != NULL);
     300           0 :                 if (BAThash(lg->catalog_nme) == GDK_SUCCEED) {
     301           0 :                         MT_rwlock_rdlock(&cni.b->thashlock);
     302           0 :                         HASHloop_str(cni, cni.b->thash, p, name) {
     303           0 :                                 oid pos = p;
     304           0 :                                 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
     305           0 :                                         oid lid = *(oid*) Tloc(lg->catalog_oid, p);
     306           0 :                                         if (!lid) {
     307           0 :                                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     308           0 :                                                 return *(log_bid *) Tloc(lg->catalog_bid, p);
     309             :                                         }
     310             :                                 }
     311             :                         }
     312           0 :                         MT_rwlock_rdunlock(&cni.b->thashlock);
     313           0 :                         return 0; /* not found */
     314             :                 }
     315             :         } else {
     316           0 :                 BATiter cni = bat_iterator_nolock(lg->catalog_oid);
     317           0 :                 BUN p;
     318             : 
     319           0 :                 if (BAThash(lg->catalog_oid) == GDK_SUCCEED) {
     320           0 :                         lng lid = (lng) id;
     321           0 :                         MT_rwlock_rdlock(&cni.b->thashlock);
     322           0 :                         HASHloop_lng(cni, cni.b->thash, p, &lid) {
     323           0 :                                 oid pos = p;
     324           0 :                                 if (*(char*)Tloc(lg->catalog_tpe, p) == tpe) {
     325           0 :                                         if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE) {
     326           0 :                                                 MT_rwlock_rdunlock(&cni.b->thashlock);
     327           0 :                                                 return *(log_bid *) Tloc(lg->catalog_bid, p);
     328             :                                         }
     329             :                                 }
     330             :                         }
     331           0 :                         MT_rwlock_rdunlock(&cni.b->thashlock);
     332           0 :                         return 0; /* not found */
     333             :                 }
     334             :         }
     335             :         return -1;              /* BAThash failed */
     336             : }
     337             : 
     338             : static gdk_return
     339           0 : la_bat_clear(old_logger *lg, logaction *la)
     340             : {
     341           0 :         log_bid bid = old_logger_find_bat(lg, la->name, la->tpe, la->cid);
     342           0 :         BAT *b;
     343             : 
     344           0 :         if (bid < 0)
     345             :                 return GDK_FAIL;
     346             : 
     347           0 :         if (lg->lg->debug & 1)
     348           0 :                 fprintf(stderr, "#la_bat_clear %s\n", NAME(la->name, la->tpe, la->cid));
     349             : 
     350             :         /* do we need to skip these old updates */
     351           0 :         if (avoid_snapshot(lg, bid))
     352             :                 return GDK_SUCCEED;
     353             : 
     354           0 :         b = BATdescriptor(bid);
     355           0 :         if (b) {
     356           0 :                 restrict_t access = b->batRestricted;
     357           0 :                 b->batRestricted = BAT_WRITE;
     358           0 :                 BATclear(b, true);
     359           0 :                 b->batRestricted = access;
     360           0 :                 logbat_destroy(b);
     361             :         }
     362             :         return GDK_SUCCEED;
     363             : }
     364             : 
     365             : static log_return
     366           0 : log_read_seq(old_logger *lg, logformat *l)
     367             : {
     368           0 :         int seq = (int) l->nr;
     369           0 :         lng val;
     370           0 :         BUN p;
     371             : 
     372           0 :         assert(l->nr <= (lng) INT_MAX);
     373           0 :         if (mnstr_readLng(lg->log, &val) != 1) {
     374           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     375           0 :                 return LOG_EOF;
     376             :         }
     377             : 
     378           0 :         if ((p = log_find(lg->seqs_id, lg->dseqs, seq)) != BUN_NONE &&
     379           0 :             p >= lg->seqs_id->batInserted) {
     380           0 :                 assert(lg->seqs_val->hseqbase == 0);
     381           0 :                 if (BUNreplace(lg->seqs_val, p, &val, false) != GDK_SUCCEED)
     382             :                         return LOG_ERR;
     383             :         } else {
     384           0 :                 if (p != BUN_NONE) {
     385           0 :                         oid pos = p;
     386           0 :                         if (BUNappend(lg->dseqs, &pos, true) != GDK_SUCCEED)
     387           0 :                                 return LOG_ERR;
     388             :                 }
     389           0 :                 if (BUNappend(lg->seqs_id, &seq, true) != GDK_SUCCEED ||
     390           0 :                     BUNappend(lg->seqs_val, &val, true) != GDK_SUCCEED)
     391           0 :                         return LOG_ERR;
     392             :         }
     393             :         return LOG_OK;
     394             : }
     395             : 
     396             : static log_return
     397           0 : log_read_id(old_logger *lg, char *tpe, oid *id)
     398             : {
     399           0 :         lng lid;
     400             : 
     401           0 :         if (mnstr_readChr(lg->log, tpe) != 1 ||
     402           0 :             mnstr_readLng(lg->log, &lid) != 1) {
     403           0 :                 TRC_CRITICAL(GDK, "read failed\n");
     404           0 :                 return LOG_EOF;
     405             :         }
     406           0 :         *id = (oid)lid;
     407           0 :         return LOG_OK;
     408             : }
     409             : 
     410             : static log_return
     411           0 : log_read_updates(old_logger *lg, trans *tr, logformat *l, char *name, int tpe, oid id, int pax)
     412             : {
     413           0 :         log_bid bid = old_logger_find_bat(lg, name, tpe, id);
     414           0 :         if (bid < 0)
     415             :                 return LOG_ERR;
     416           0 :         BAT *b = BATdescriptor(bid);
     417           0 :         log_return res = LOG_OK;
     418           0 :         int ht = -1, tt = -1, tseq = 0;
     419             : 
     420           0 :         if (lg->lg->debug & 1) {
     421           0 :                 if (name)
     422           0 :                         fprintf(stderr, "#logger found log_read_updates %s %s " LLFMT "\n", name, l->flag == LOG_INSERT ? "insert" : "update", l->nr);
     423             :                 else
     424           0 :                         fprintf(stderr, "#logger found log_read_updates " OIDFMT " %s " LLFMT "\n", id, l->flag == LOG_INSERT ? "insert" : "update", l->nr);
     425             :         }
     426             : 
     427           0 :         if (b) {
     428           0 :                 ht = TYPE_void;
     429           0 :                 tt = b->ttype;
     430           0 :                 if (tt == TYPE_void && BATtdense(b))
     431           0 :                         tseq = 1;
     432             :         } else {                /* search trans action for create statement */
     433             :                 int i;
     434             : 
     435           0 :                 for (i = 0; i < tr->nr; i++) {
     436           0 :                         if (tr->changes[i].type == LOG_CREATE &&
     437           0 :                             (tpe == 0 && name != NULL
     438           0 :                              ? strcmp(tr->changes[i].name, name) == 0
     439           0 :                              : tr->changes[i].tpe == tpe && tr->changes[i].cid == id)) {
     440           0 :                                 ht = tr->changes[i].ht;
     441           0 :                                 if (ht < 0) {
     442             :                                         ht = TYPE_void;
     443             :                                 }
     444           0 :                                 tt = tr->changes[i].tt;
     445           0 :                                 if (tt < 0) {
     446           0 :                                         tseq = 1;
     447           0 :                                         tt = TYPE_void;
     448             :                                 }
     449             :                                 break;
     450           0 :                         } else if (tr->changes[i].type == LOG_USE &&
     451           0 :                                    (tpe == 0 && name != NULL
     452           0 :                                     ? strcmp(tr->changes[i].name, name) == 0
     453           0 :                                     : tr->changes[i].tpe == tpe && tr->changes[i].cid == id)) {
     454           0 :                                 log_bid bid = (log_bid) tr->changes[i].nr;
     455           0 :                                 BAT *b = BATdescriptor(bid);
     456             : 
     457           0 :                                 if (b) {
     458           0 :                                         ht = TYPE_void;
     459           0 :                                         tt = b->ttype;
     460             :                                 }
     461             :                                 break;
     462             :                         }
     463             :                 }
     464           0 :                 assert(i < tr->nr); /* found one */
     465             :         }
     466           0 :         assert((ht == TYPE_void && l->flag == LOG_INSERT) ||
     467             :                ((ht == TYPE_oid || !ht) && l->flag == LOG_UPDATE));
     468           0 :         if ((ht != TYPE_void && l->flag == LOG_INSERT) ||
     469           0 :            ((ht != TYPE_void && ht != TYPE_oid) && l->flag == LOG_UPDATE))
     470             :                 return LOG_ERR;
     471           0 :         if (ht >= 0 && tt >= 0) {
     472           0 :                 BAT *uid = NULL;
     473           0 :                 BAT *r;
     474           0 :                 void *(*rt) (ptr, size_t *, stream *, size_t) = BATatoms[tt].atomRead;
     475             : 
     476           0 :                 assert(l->nr <= (lng) BUN_MAX);
     477           0 :                 if (l->flag == LOG_UPDATE) {
     478           0 :                         uid = COLnew(0, ht, (BUN) l->nr, PERSISTENT);
     479           0 :                         if (uid == NULL) {
     480           0 :                                 logbat_destroy(b);
     481           0 :                                 return LOG_ERR;
     482             :                         }
     483             :                 } else {
     484           0 :                         assert(ht == TYPE_void);
     485             :                 }
     486           0 :                 r = COLnew(0, tt, (BUN) l->nr, PERSISTENT);
     487           0 :                 if (r == NULL) {
     488           0 :                         BBPreclaim(uid);
     489           0 :                         logbat_destroy(b);
     490           0 :                         return LOG_ERR;
     491             :                 }
     492             : 
     493           0 :                 if (tseq)
     494           0 :                         BATtseqbase(r, 0);
     495             : 
     496           0 :                 if (ht == TYPE_void && l->flag == LOG_INSERT) {
     497           0 :                         lng nr = l->nr;
     498           0 :                         for (; res == LOG_OK && nr > 0; nr--) {
     499           0 :                                 size_t tlen = lg->lg->rbufsize;
     500           0 :                                 void *t = rt(lg->lg->rbuf, &tlen, lg->log, 1);
     501             : 
     502           0 :                                 if (t == NULL) {
     503             :                                         /* see if failure was due to
     504             :                                          * malloc or something less
     505             :                                          * serious (in the current
     506             :                                          * context) */
     507           0 :                                         if (strstr(GDKerrbuf, "alloc") == NULL)
     508             :                                                 res = LOG_EOF;
     509             :                                         else
     510           0 :                                                 res = LOG_ERR;
     511           0 :                                         break;
     512             :                                 } else {
     513           0 :                                         lg->lg->rbuf = t;
     514           0 :                                         lg->lg->rbufsize = tlen;
     515             :                                 }
     516           0 :                                 if (BUNappend(r, t, true) != GDK_SUCCEED)
     517           0 :                                         res = LOG_ERR;
     518             :                         }
     519             :                 } else {
     520           0 :                         void *(*rh) (ptr, size_t *, stream *, size_t) = ht == TYPE_void ? BATatoms[TYPE_oid].atomRead : BATatoms[ht].atomRead;
     521           0 :                         void *hv = ATOMnil(ht);
     522           0 :                         size_t hlen = ATOMsize(ht);
     523             : 
     524           0 :                         if (hv == NULL)
     525           0 :                                 res = LOG_ERR;
     526             : 
     527           0 :                         if (!pax) {
     528           0 :                                 lng nr = l->nr;
     529           0 :                                 for (; res == LOG_OK && nr > 0; nr--) {
     530           0 :                                         size_t tlen = lg->lg->rbufsize;
     531           0 :                                         void *h = rh(hv, &hlen, lg->log, 1);
     532           0 :                                         void *t = rt(lg->lg->rbuf, &tlen, lg->log, 1);
     533             : 
     534           0 :                                         if (t != NULL) {
     535           0 :                                                 lg->lg->rbuf = t;
     536           0 :                                                 lg->lg->rbufsize = tlen;
     537             :                                         }
     538           0 :                                         if (h == NULL)
     539             :                                                 res = LOG_EOF;
     540           0 :                                         else if (t == NULL) {
     541           0 :                                                 if (strstr(GDKerrbuf, "malloc") == NULL)
     542             :                                                         res = LOG_EOF;
     543             :                                                 else
     544           0 :                                                         res = LOG_ERR;
     545           0 :                                         } else if (BUNappend(uid, h, true) != GDK_SUCCEED ||
     546           0 :                                                 BUNappend(r, t, true) != GDK_SUCCEED)
     547             :                                                 res = LOG_ERR;
     548             :                                 }
     549             :                         } else {
     550           0 :                                 char compressed = 0;
     551           0 :                                 lng nr = l->nr;
     552             : 
     553           0 :                                 if (mnstr_read(lg->log, &compressed, 1, 1) != 1)
     554           0 :                                         return LOG_ERR;
     555             : 
     556           0 :                                 if (compressed) {
     557           0 :                                         void *h = rh(hv, &hlen, lg->log, 1);
     558             : 
     559           0 :                                         assert(uid->ttype == TYPE_void);
     560           0 :                                         if (h == NULL)
     561             :                                                 res = LOG_EOF;
     562             :                                         else {
     563           0 :                                                 BATtseqbase(uid, *(oid*)h);
     564           0 :                                                 BATsetcount(uid, (BUN) l->nr);
     565             :                                         }
     566             :                                 } else {
     567           0 :                                         for (; res == LOG_OK && nr > 0; nr--) {
     568           0 :                                                 void *h = rh(hv, &hlen, lg->log, 1);
     569             : 
     570           0 :                                                 if (h == NULL)
     571             :                                                         res = LOG_EOF;
     572           0 :                                                 else if (BUNappend(uid, h, true) != GDK_SUCCEED)
     573           0 :                                                         res = LOG_ERR;
     574             :                                         }
     575             :                                 }
     576           0 :                                 nr = l->nr;
     577           0 :                                 for (; res == LOG_OK && nr > 0; nr--) {
     578           0 :                                         size_t tlen = lg->lg->rbufsize;
     579           0 :                                         void *t = rt(lg->lg->rbuf, &tlen, lg->log, 1);
     580             : 
     581           0 :                                         if (t == NULL) {
     582           0 :                                                 if (strstr(GDKerrbuf, "malloc") == NULL)
     583             :                                                         res = LOG_EOF;
     584             :                                                 else
     585           0 :                                                         res = LOG_ERR;
     586             :                                         } else {
     587           0 :                                                 lg->lg->rbuf = t;
     588           0 :                                                 lg->lg->rbufsize = tlen;
     589           0 :                                                 if (BUNappend(r, t, true) != GDK_SUCCEED)
     590           0 :                                                         res = LOG_ERR;
     591             :                                         }
     592             :                                 }
     593             :                         }
     594           0 :                         GDKfree(hv);
     595             :                 }
     596             : 
     597           0 :                 if (res == LOG_OK) {
     598           0 :                         if (tr_grow(tr) == GDK_SUCCEED) {
     599           0 :                                 tr->changes[tr->nr].type = l->flag;
     600           0 :                                 tr->changes[tr->nr].nr = l->nr;
     601           0 :                                 tr->changes[tr->nr].ht = ht;
     602           0 :                                 tr->changes[tr->nr].tt = tt;
     603           0 :                                 tr->changes[tr->nr].tpe = tpe;
     604           0 :                                 tr->changes[tr->nr].cid = id;
     605           0 :                                 if (name && (tr->changes[tr->nr].name = GDKstrdup(name)) == NULL) {
     606           0 :                                         logbat_destroy(b);
     607           0 :                                         BBPreclaim(uid);
     608           0 :                                         BBPreclaim(r);
     609           0 :                                         return LOG_ERR;
     610             :                                 }
     611           0 :                                 tr->changes[tr->nr].b = r;
     612           0 :                                 tr->changes[tr->nr].uid = uid;
     613           0 :                                 tr->nr++;
     614             :                         } else {
     615             :                                 res = LOG_ERR;
     616             :                         }
     617             :                 }
     618             :         } else {
     619             :                 /* bat missing ERROR or ignore ? currently error. */
     620             :                 res = LOG_ERR;
     621             :         }
     622           0 :         logbat_destroy(b);
     623             :         return res;
     624             : }
     625             : 
     626             : static gdk_return
     627           0 : la_bat_updates(old_logger *lg, logaction *la)
     628             : {
     629           0 :         log_bid bid = old_logger_find_bat(lg, la->name, la->tpe, la->cid);
     630           0 :         BAT *b;
     631             : 
     632           0 :         if (bid < 0)
     633             :                 return GDK_FAIL;
     634           0 :         if (bid == 0)
     635             :                 return GDK_SUCCEED; /* ignore bats no longer in the catalog */
     636             : 
     637             :         /* do we need to skip these old updates */
     638           0 :         if (avoid_snapshot(lg, bid))
     639             :                 return GDK_SUCCEED;
     640             : 
     641           0 :         b = BATdescriptor(bid);
     642           0 :         if (b == NULL)
     643             :                 return GDK_FAIL;
     644           0 :         if (la->type == LOG_INSERT) {
     645           0 :                 if (BATappend(b, la->b, NULL, true) != GDK_SUCCEED) {
     646           0 :                         logbat_destroy(b);
     647           0 :                         return GDK_FAIL;
     648             :                 }
     649           0 :         } else if (la->type == LOG_UPDATE) {
     650           0 :                 BATiter vi = bat_iterator(la->b);
     651           0 :                 BUN p, q;
     652             : 
     653           0 :                 BATloop(la->b, p, q) {
     654           0 :                         oid h = BUNtoid(la->uid, p);
     655           0 :                         const void *t = BUNtail(vi, p);
     656             : 
     657           0 :                         if (h < b->hseqbase || h >= b->hseqbase + BATcount(b)) {
     658             :                                 /* if value doesn't exist, insert it;
     659             :                                  * if b void headed, maintain that by
     660             :                                  * inserting nils */
     661           0 :                                 if (b->batCount == 0 && !is_oid_nil(h))
     662           0 :                                         b->hseqbase = h;
     663           0 :                                 if (!is_oid_nil(b->hseqbase) && !is_oid_nil(h)) {
     664           0 :                                         const void *tv = ATOMnilptr(b->ttype);
     665             : 
     666           0 :                                         while (b->hseqbase + b->batCount < h) {
     667           0 :                                                 if (BUNappend(b, tv, true) != GDK_SUCCEED) {
     668           0 :                                                         logbat_destroy(b);
     669           0 :                                                         bat_iterator_end(&vi);
     670           0 :                                                         return GDK_FAIL;
     671             :                                                 }
     672             :                                         }
     673             :                                 }
     674           0 :                                 if (BUNappend(b, t, true) != GDK_SUCCEED) {
     675           0 :                                         logbat_destroy(b);
     676           0 :                                         bat_iterator_end(&vi);
     677           0 :                                         return GDK_FAIL;
     678             :                                 }
     679             :                         } else {
     680           0 :                                 if (BUNreplace(b, h, t, true) != GDK_SUCCEED) {
     681           0 :                                         logbat_destroy(b);
     682           0 :                                         bat_iterator_end(&vi);
     683           0 :                                         return GDK_FAIL;
     684             :                                 }
     685             :                         }
     686             :                 }
     687           0 :                 bat_iterator_end(&vi);
     688             :         }
     689           0 :         logbat_destroy(b);
     690           0 :         return GDK_SUCCEED;
     691             : }
     692             : 
     693             : static log_return
     694           0 : log_read_destroy(old_logger *lg, trans *tr, char *name, char tpe, oid id)
     695             : {
     696           0 :         (void) lg;
     697           0 :         if (tr_grow(tr) == GDK_SUCCEED) {
     698           0 :                 tr->changes[tr->nr].type = LOG_DESTROY;
     699           0 :                 tr->changes[tr->nr].tpe = tpe;
     700           0 :                 tr->changes[tr->nr].cid = id;
     701           0 :                 if (name && (tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
     702             :                         return LOG_ERR;
     703           0 :                 tr->nr++;
     704             :         }
     705             :         return LOG_OK;
     706             : }
     707             : 
     708             : static gdk_return
     709           0 : la_bat_destroy(old_logger *lg, logaction *la)
     710             : {
     711           0 :         log_bid bid = old_logger_find_bat(lg, la->name, la->tpe, la->cid);
     712             : 
     713           0 :         if (bid < 0)
     714             :                 return GDK_FAIL;
     715           0 :         if (bid) {
     716           0 :                 BUN p;
     717             : 
     718           0 :                 if (logger_del_bat(lg, bid) != GDK_SUCCEED)
     719             :                         return GDK_FAIL;
     720             : 
     721           0 :                 if ((p = log_find(lg->snapshots_bid, lg->dsnapshots, bid)) != BUN_NONE) {
     722           0 :                         oid pos = (oid) p;
     723             : #ifndef NDEBUG
     724           0 :                         BAT *b = BBP_desc(bid);
     725           0 :                         assert(b->batRole == PERSISTENT);
     726           0 :                         assert(0 <= b->theap->farmid && b->theap->farmid < MAXFARMS);
     727           0 :                         assert(BBPfarms[b->theap->farmid].roles & (1 << PERSISTENT));
     728           0 :                         if (b->tvheap) {
     729           0 :                                 assert(0 <= b->tvheap->farmid && b->tvheap->farmid < MAXFARMS);
     730           0 :                                 assert(BBPfarms[b->tvheap->farmid].roles & (1 << PERSISTENT));
     731             :                         }
     732             : #endif
     733           0 :                         if (BUNappend(lg->dsnapshots, &pos, true) != GDK_SUCCEED)
     734           0 :                                 return GDK_FAIL;
     735             :                 }
     736             :         }
     737             :         return GDK_SUCCEED;
     738             : }
     739             : 
     740             : static log_return
     741           0 : log_read_create(old_logger *lg, trans *tr, char *name, char tpe, oid id)
     742             : {
     743           0 :         char *buf = log_read_string(lg);
     744           0 :         int ht, tt;
     745           0 :         char *ha, *ta;
     746             : 
     747           0 :         if (lg->lg->debug & 1)
     748           0 :                 fprintf(stderr, "#log_read_create %s\n", name);
     749             : 
     750           0 :         if (buf == NULL)
     751             :                 return LOG_EOF;
     752           0 :         if (buf == (char *) -1)
     753             :                 return LOG_ERR;
     754           0 :         ha = buf;
     755           0 :         ta = strchr(buf, ',');
     756           0 :         if (ta == NULL) {
     757           0 :                 TRC_CRITICAL(GDK, "inconsistent data read\n");
     758           0 :                 return LOG_ERR;
     759             :         }
     760           0 :         *ta++ = 0;              /* skip over , */
     761           0 :         if (strcmp(ha, "vid") == 0) {
     762             :                 ht = -1;
     763             :         } else {
     764           0 :                 ht = ATOMindex(ha);
     765             :         }
     766           0 :         if (strcmp(ta, "vid") == 0) {
     767             :                 tt = -1;
     768             :         } else {
     769           0 :                 tt = ATOMindex(ta);
     770             :         }
     771           0 :         GDKfree(buf);
     772           0 :         if (tr_grow(tr) == GDK_SUCCEED) {
     773           0 :                 tr->changes[tr->nr].type = LOG_CREATE;
     774           0 :                 tr->changes[tr->nr].ht = ht;
     775           0 :                 tr->changes[tr->nr].tt = tt;
     776           0 :                 tr->changes[tr->nr].tpe = tpe;
     777           0 :                 tr->changes[tr->nr].cid = id;
     778           0 :                 if ((tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
     779             :                         return LOG_ERR;
     780           0 :                 tr->changes[tr->nr].b = NULL;
     781           0 :                 tr->nr++;
     782             :         }
     783             : 
     784             :         return LOG_OK;
     785             : }
     786             : 
     787             : static gdk_return
     788           0 : la_bat_create(old_logger *lg, logaction *la)
     789             : {
     790           0 :         int tt = (la->tt < 0) ? TYPE_void : la->tt;
     791           0 :         BAT *b;
     792             : 
     793             :         /* formerly head column type, should be void */
     794           0 :         assert(((la->ht < 0) ? TYPE_void : la->ht) == TYPE_void);
     795           0 :         if ((b = COLnew(0, tt, BATSIZE, PERSISTENT)) == NULL)
     796             :                 return GDK_FAIL;
     797             : 
     798           0 :         if (la->tt < 0)
     799           0 :                 BATtseqbase(b, 0);
     800             : 
     801           0 :         if ((b = BATsetaccess(b, BAT_READ)) == NULL ||
     802           0 :             logger_add_bat(lg, b, la->name, la->tpe, la->cid) != GDK_SUCCEED) {
     803           0 :                 logbat_destroy(b);
     804           0 :                 return GDK_FAIL;
     805             :         }
     806           0 :         logbat_destroy(b);
     807           0 :         return GDK_SUCCEED;
     808             : }
     809             : 
     810             : static log_return
     811           0 : log_read_use(old_logger *lg, trans *tr, logformat *l, char *name, char tpe, oid id)
     812             : {
     813           0 :         (void) lg;
     814             : 
     815           0 :         if (tr_grow(tr) != GDK_SUCCEED)
     816             :                 return LOG_ERR;
     817           0 :         tr->changes[tr->nr].type = LOG_USE;
     818           0 :         tr->changes[tr->nr].nr = l->nr;
     819           0 :         tr->changes[tr->nr].tpe = tpe;
     820           0 :         tr->changes[tr->nr].cid = id;
     821           0 :         if ((tr->changes[tr->nr].name = GDKstrdup(name)) == NULL)
     822             :                 return LOG_ERR;
     823           0 :         tr->changes[tr->nr].b = NULL;
     824           0 :         tr->nr++;
     825           0 :         return LOG_OK;
     826             : }
     827             : 
     828             : static gdk_return
     829           0 : la_bat_use(old_logger *lg, logaction *la)
     830             : {
     831           0 :         log_bid bid = (log_bid) la->nr;
     832           0 :         BAT *b = BATdescriptor(bid);
     833           0 :         BUN p;
     834             : 
     835           0 :         assert(la->nr <= (lng) INT_MAX);
     836           0 :         if (b == NULL) {
     837           0 :                 GDKerror("logger: could not use bat (%d) for %s\n", (int) bid, NAME(la->name, la->tpe, la->cid));
     838           0 :                 return GDK_FAIL;
     839             :         }
     840           0 :         if (logger_add_bat(lg, b, la->name, la->tpe, la->cid) != GDK_SUCCEED)
     841           0 :                 goto bailout;
     842             : #ifndef NDEBUG
     843           0 :         assert(b->batRole == PERSISTENT);
     844           0 :         assert(0 <= b->theap->farmid && b->theap->farmid < MAXFARMS);
     845           0 :         assert(BBPfarms[b->theap->farmid].roles & (1 << PERSISTENT));
     846           0 :         if (b->tvheap) {
     847           0 :                 assert(0 <= b->tvheap->farmid && b->tvheap->farmid < MAXFARMS);
     848           0 :                 assert(BBPfarms[b->tvheap->farmid].roles & (1 << PERSISTENT));
     849             :         }
     850             : #endif
     851           0 :         if ((p = log_find(lg->snapshots_bid, lg->dsnapshots, b->batCacheid)) != BUN_NONE &&
     852           0 :             p >= lg->snapshots_bid->batInserted) {
     853           0 :                 assert(lg->snapshots_tid->hseqbase == 0);
     854           0 :                 if (BUNreplace(lg->snapshots_tid, p, &lg->tid, true) != GDK_SUCCEED)
     855           0 :                         goto bailout;
     856             :         } else {
     857           0 :                 if (p != BUN_NONE) {
     858           0 :                         oid pos = p;
     859           0 :                         if (BUNappend(lg->dsnapshots, &pos, true) != GDK_SUCCEED)
     860           0 :                                 goto bailout;
     861             :                 }
     862             :                 /* move to the dirty new part of the snapshots list,
     863             :                  * new snapshots will get flushed to disk */
     864           0 :                 if (BUNappend(lg->snapshots_bid, &b->batCacheid, true) != GDK_SUCCEED ||
     865           0 :                     BUNappend(lg->snapshots_tid, &lg->tid, true) != GDK_SUCCEED)
     866           0 :                         goto bailout;
     867             :         }
     868           0 :         logbat_destroy(b);
     869           0 :         return GDK_SUCCEED;
     870             : 
     871           0 :   bailout:
     872           0 :         logbat_destroy(b);
     873           0 :         return GDK_FAIL;
     874             : }
     875             : 
     876             : 
     877             : #define TR_SIZE         1024
     878             : 
     879             : static trans *
     880           0 : tr_create(trans *tr, int tid)
     881             : {
     882           0 :         trans *ntr = GDKmalloc(sizeof(trans));
     883             : 
     884           0 :         if (ntr == NULL)
     885             :                 return NULL;
     886           0 :         ntr->tid = tid;
     887           0 :         ntr->sz = TR_SIZE;
     888           0 :         ntr->nr = 0;
     889           0 :         ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
     890           0 :         if (ntr->changes == NULL) {
     891           0 :                 GDKfree(ntr);
     892           0 :                 return NULL;
     893             :         }
     894           0 :         ntr->tr = tr;
     895           0 :         return ntr;
     896             : }
     897             : 
     898             : static trans *
     899           0 : tr_find(trans *tr, int tid)
     900             : /* finds the tid and reorders the chain list, puts trans with tid first */
     901             : {
     902           0 :         trans *t = tr, *p = NULL;
     903             : 
     904           0 :         while (t && t->tid != tid) {
     905           0 :                 p = t;
     906           0 :                 t = t->tr;
     907             :         }
     908           0 :         if (t == NULL)
     909             :                 return NULL;    /* BAD missing transaction */
     910           0 :         if (t == tr)
     911             :                 return tr;
     912           0 :         if (t->tr)           /* get this tid out of the list */
     913           0 :                 p->tr = t->tr;
     914           0 :         t->tr = tr;          /* and move it to the front */
     915           0 :         return t;
     916             : }
     917             : 
     918             : static gdk_return
     919           0 : la_apply(old_logger *lg, logaction *c)
     920             : {
     921           0 :         gdk_return ret = GDK_FAIL;
     922             : 
     923           0 :         switch (c->type) {
     924           0 :         case LOG_INSERT:
     925             :         case LOG_UPDATE:
     926           0 :                 ret = la_bat_updates(lg, c);
     927           0 :                 break;
     928           0 :         case LOG_CREATE:
     929           0 :                 ret = la_bat_create(lg, c);
     930           0 :                 break;
     931           0 :         case LOG_USE:
     932           0 :                 ret = la_bat_use(lg, c);
     933           0 :                 break;
     934           0 :         case LOG_DESTROY:
     935           0 :                 ret = la_bat_destroy(lg, c);
     936           0 :                 break;
     937           0 :         case LOG_CLEAR:
     938           0 :                 ret = la_bat_clear(lg, c);
     939           0 :                 break;
     940             :         default:
     941           0 :                 MT_UNREACHABLE();
     942             :         }
     943           0 :         lg->changes += (ret == GDK_SUCCEED);
     944           0 :         return ret;
     945             : }
     946             : 
     947             : static void
     948           0 : la_destroy(logaction *c)
     949             : {
     950           0 :         if (c->name)
     951           0 :                 GDKfree(c->name);
     952           0 :         if (c->b)
     953           0 :                 logbat_destroy(c->b);
     954           0 : }
     955             : 
     956             : static gdk_return
     957           0 : tr_grow(trans *tr)
     958             : {
     959           0 :         if (tr->nr == tr->sz) {
     960           0 :                 logaction *changes;
     961           0 :                 tr->sz <<= 1;
     962           0 :                 changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
     963           0 :                 if (changes == NULL)
     964             :                         return GDK_FAIL;
     965           0 :                 tr->changes = changes;
     966             :         }
     967             :         /* cleanup the next */
     968           0 :         tr->changes[tr->nr].name = NULL;
     969           0 :         tr->changes[tr->nr].b = NULL;
     970           0 :         return GDK_SUCCEED;
     971             : }
     972             : 
     973             : static trans *
     974           0 : tr_destroy(trans *tr)
     975             : {
     976           0 :         trans *r = tr->tr;
     977             : 
     978           0 :         GDKfree(tr->changes);
     979           0 :         GDKfree(tr);
     980           0 :         return r;
     981             : }
     982             : 
     983             : static trans *
     984           0 : tr_abort(old_logger *lg, trans *tr)
     985             : {
     986           0 :         int i;
     987             : 
     988           0 :         if (lg->lg->debug & 1)
     989           0 :                 fprintf(stderr, "#tr_abort\n");
     990             : 
     991           0 :         for (i = 0; i < tr->nr; i++)
     992           0 :                 la_destroy(&tr->changes[i]);
     993           0 :         return tr_destroy(tr);
     994             : }
     995             : 
     996             : static trans *
     997           0 : tr_commit(old_logger *lg, trans *tr)
     998             : {
     999           0 :         int i;
    1000             : 
    1001           0 :         if (lg->lg->debug & 1)
    1002           0 :                 fprintf(stderr, "#tr_commit\n");
    1003             : 
    1004           0 :         for (i = 0; i < tr->nr; i++) {
    1005           0 :                 if (la_apply(lg, &tr->changes[i]) != GDK_SUCCEED) {
    1006           0 :                         do {
    1007           0 :                                 tr = tr_abort(lg, tr);
    1008           0 :                         } while (tr != NULL);
    1009             :                         return (trans *) -1;
    1010             :                 }
    1011           0 :                 la_destroy(&tr->changes[i]);
    1012             :         }
    1013           0 :         return tr_destroy(tr);
    1014             : }
    1015             : 
    1016             : static inline void
    1017           0 : logger_close(old_logger *lg)
    1018             : {
    1019           0 :         if (!LOG_DISABLED(lg))
    1020           0 :                 close_stream(lg->log);
    1021           0 :         lg->log = NULL;
    1022           0 : }
    1023             : 
    1024             : static gdk_return
    1025           0 : logger_readlog(old_logger *lg, char *filename, bool *filemissing)
    1026             : {
    1027           0 :         trans *tr = NULL;
    1028           0 :         logformat l;
    1029           0 :         log_return err = LOG_OK;
    1030           0 :         time_t t0, t1;
    1031           0 :         struct stat sb;
    1032           0 :         ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
    1033           0 :         int fd;
    1034             : 
    1035           0 :         ATOMIC_AND(&GDKdebug, ~CHECKMASK);
    1036             : 
    1037           0 :         if (lg->lg->debug & 1) {
    1038           0 :                 fprintf(stderr, "#logger_readlog opening %s\n", filename);
    1039             :         }
    1040             : 
    1041           0 :         lg->log = open_rstream(filename);
    1042             : 
    1043             :         /* if the file doesn't exist, there is nothing to be read back */
    1044           0 :         if (lg->log == NULL || mnstr_errnr(lg->log) != MNSTR_NO__ERROR) {
    1045           0 :                 logger_close(lg);
    1046           0 :                 ATOMIC_SET(&GDKdebug, dbg);
    1047           0 :                 *filemissing = true;
    1048           0 :                 return GDK_SUCCEED;
    1049             :         }
    1050           0 :         short byteorder;
    1051           0 :         switch (mnstr_read(lg->log, &byteorder, sizeof(byteorder), 1)) {
    1052           0 :         case -1:
    1053           0 :                 logger_close(lg);
    1054           0 :                 ATOMIC_SET(&GDKdebug, dbg);
    1055           0 :                 return GDK_FAIL;
    1056           0 :         case 0:
    1057             :                 /* empty file is ok */
    1058           0 :                 logger_close(lg);
    1059           0 :                 ATOMIC_SET(&GDKdebug, dbg);
    1060           0 :                 return GDK_SUCCEED;
    1061           0 :         case 1:
    1062             :                 /* if not empty, must start with correct byte order mark */
    1063           0 :                 if (byteorder != 1234) {
    1064           0 :                         TRC_CRITICAL(GDK, "incorrect byte order word in file %s\n", filename);
    1065           0 :                         logger_close(lg);
    1066           0 :                         ATOMIC_SET(&GDKdebug, dbg);
    1067           0 :                         return GDK_FAIL;
    1068             :                 }
    1069             :                 break;
    1070             :         }
    1071           0 :         if ((fd = getFileNo(lg->log)) < 0 || fstat(fd, &sb) < 0) {
    1072           0 :                 TRC_CRITICAL(GDK, "fstat on opened file %s failed\n", filename);
    1073           0 :                 logger_close(lg);
    1074           0 :                 ATOMIC_SET(&GDKdebug, dbg);
    1075             :                 /* If the file could be opened, but fstat fails,
    1076             :                  * something weird is going on */
    1077           0 :                 return GDK_FAIL;
    1078             :         }
    1079           0 :         t0 = time(NULL);
    1080           0 :         if (lg->lg->debug & 1) {
    1081           0 :                 printf("# Start reading the write-ahead log '%s'\n", filename);
    1082           0 :                 fflush(stdout);
    1083             :         }
    1084           0 :         while (err == LOG_OK && log_read_format(lg, &l)) {
    1085           0 :                 char *name = NULL;
    1086           0 :                 char tpe;
    1087           0 :                 oid id;
    1088             : 
    1089           0 :                 if (l.flag == 0) {
    1090             :                         /* end of useful content */
    1091           0 :                         assert(l.tid == 0);
    1092           0 :                         assert(l.nr == 0);
    1093           0 :                         break;
    1094             :                 }
    1095           0 :                 t1 = time(NULL);
    1096           0 :                 if (t1 - t0 > 10) {
    1097           0 :                         lng fpos;
    1098           0 :                         t0 = t1;
    1099             :                         /* not more than once every 10 seconds */
    1100           0 :                         fpos = (lng) getfilepos(getFile(lg->log));
    1101           0 :                         if (fpos >= 0) {
    1102           0 :                                 printf("# still reading write-ahead log \"%s\" (%d%% done)\n", filename, (int) ((fpos * 100 + 50) / sb.st_size));
    1103           0 :                                 fflush(stdout);
    1104             :                         }
    1105             :                 }
    1106           0 :                 if ((l.flag >= LOG_INSERT && l.flag <= LOG_CLEAR) || l.flag == LOG_CREATE_ID || l.flag == LOG_USE_ID) {
    1107           0 :                         name = log_read_string(lg);
    1108             : 
    1109           0 :                         if (name == NULL) {
    1110             :                                 err = LOG_EOF;
    1111             :                                 break;
    1112             :                         }
    1113           0 :                         if (name == (char *) -1) {
    1114             :                                 err = LOG_ERR;
    1115             :                                 break;
    1116             :                         }
    1117             :                 }
    1118           0 :                 if (lg->lg->debug & 1) {
    1119           0 :                         fprintf(stderr, "#logger_readlog: ");
    1120           0 :                         if (l.flag > 0 &&
    1121             :                             l.flag < (char) (sizeof(log_commands) / sizeof(log_commands[0])))
    1122           0 :                                 fprintf(stderr, "%s", log_commands[(int) l.flag]);
    1123             :                         else
    1124           0 :                                 fprintf(stderr, "%d", l.flag);
    1125           0 :                         fprintf(stderr, " %d " LLFMT, l.tid, l.nr);
    1126           0 :                         if (name)
    1127           0 :                                 fprintf(stderr, " %s", name);
    1128           0 :                         fprintf(stderr, "\n");
    1129             :                 }
    1130             :                 /* find proper transaction record */
    1131           0 :                 if (l.flag != LOG_START)
    1132           0 :                         tr = tr_find(tr, l.tid);
    1133             :                 /* the functions we call here can succeed (LOG_OK),
    1134             :                  * but they can also fail for two different reasons:
    1135             :                  * they can run out of input (LOG_EOF -- this is not
    1136             :                  * serious, we just abort the remaining transactions),
    1137             :                  * or some malloc or BAT update fails (LOG_ERR -- this
    1138             :                  * is serious, we must abort the complete process);
    1139             :                  * the latter failure causes the current function to
    1140             :                  * return GDK_FAIL */
    1141           0 :                 switch (l.flag) {
    1142           0 :                 case LOG_START:
    1143           0 :                         assert(l.nr <= (lng) INT_MAX);
    1144           0 :                         if (l.nr > lg->tid)
    1145           0 :                                 lg->tid = (int)l.nr;
    1146           0 :                         if ((tr = tr_create(tr, (int)l.nr)) == NULL) {
    1147             :                                 err = LOG_ERR;
    1148             :                                 break;
    1149             :                         }
    1150           0 :                         if (lg->lg->debug & 1)
    1151           0 :                                 fprintf(stderr, "#logger tstart %d\n", tr->tid);
    1152             :                         break;
    1153           0 :                 case LOG_END:
    1154           0 :                         if (tr == NULL)
    1155             :                                 err = LOG_EOF;
    1156           0 :                         else if (l.tid != l.nr) /* abort record */
    1157           0 :                                 tr = tr_abort(lg, tr);
    1158             :                         else
    1159           0 :                                 tr = tr_commit(lg, tr);
    1160             :                         break;
    1161           0 :                 case LOG_SEQ:
    1162           0 :                         err = log_read_seq(lg, &l);
    1163           0 :                         break;
    1164           0 :                 case LOG_INSERT:
    1165             :                 case LOG_UPDATE:
    1166           0 :                         if (name == NULL || tr == NULL)
    1167             :                                 err = LOG_EOF;
    1168             :                         else
    1169           0 :                                 err = log_read_updates(lg, tr, &l, name, 0, 0, 0);
    1170             :                         break;
    1171           0 :                 case LOG_INSERT_ID:
    1172             :                 case LOG_UPDATE_ID:
    1173             :                 case LOG_UPDATE_PAX: {
    1174           0 :                         int pax = (l.flag == LOG_UPDATE_PAX);
    1175           0 :                         l.flag = (l.flag == LOG_INSERT_ID)?LOG_INSERT:LOG_UPDATE;
    1176           0 :                         if (log_read_id(lg, &tpe, &id) != LOG_OK)
    1177             :                                 err = LOG_ERR;
    1178             :                         else
    1179           0 :                                 err = log_read_updates(lg, tr, &l, name, tpe, id, pax);
    1180             :                         break;
    1181             :                 }
    1182           0 :                 case LOG_CREATE:
    1183           0 :                         if (name == NULL || tr == NULL)
    1184             :                                 err = LOG_EOF;
    1185             :                         else
    1186           0 :                                 err = log_read_create(lg, tr, name, 0, 0);
    1187             :                         break;
    1188           0 :                 case LOG_CREATE_ID:
    1189           0 :                         l.flag = LOG_CREATE;
    1190           0 :                         if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
    1191             :                                 err = LOG_EOF;
    1192             :                         else
    1193           0 :                                 err = log_read_create(lg, tr, name, tpe, id);
    1194             :                         break;
    1195           0 :                 case LOG_USE:
    1196           0 :                         if (name == NULL || tr == NULL)
    1197             :                                 err = LOG_EOF;
    1198             :                         else
    1199           0 :                                 err = log_read_use(lg, tr, &l, name, 0, 0);
    1200             :                         break;
    1201           0 :                 case LOG_USE_ID:
    1202           0 :                         l.flag = LOG_USE;
    1203           0 :                         if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
    1204             :                                 err = LOG_EOF;
    1205             :                         else
    1206           0 :                                 err = log_read_use(lg, tr, &l, name, tpe, id);
    1207             :                         break;
    1208           0 :                 case LOG_DESTROY:
    1209           0 :                         if (name == NULL || tr == NULL)
    1210             :                                 err = LOG_EOF;
    1211             :                         else
    1212           0 :                                 err = log_read_destroy(lg, tr, name, 0, 0);
    1213             :                         break;
    1214           0 :                 case LOG_DESTROY_ID:
    1215           0 :                         l.flag = LOG_DESTROY;
    1216           0 :                         if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
    1217             :                                 err = LOG_EOF;
    1218             :                         else
    1219           0 :                                 err = log_read_destroy(lg, tr, name, tpe, id);
    1220             :                         break;
    1221           0 :                 case LOG_CLEAR:
    1222           0 :                         if (name == NULL || tr == NULL)
    1223             :                                 err = LOG_EOF;
    1224             :                         else
    1225           0 :                                 err = log_read_clear(lg, tr, name, 0, 0);
    1226             :                         break;
    1227           0 :                 case LOG_CLEAR_ID:
    1228           0 :                         l.flag = LOG_CLEAR;
    1229           0 :                         if (tr == NULL || log_read_id(lg, &tpe, &id) != LOG_OK)
    1230             :                                 err = LOG_EOF;
    1231             :                         else
    1232           0 :                                 err = log_read_clear(lg, tr, name, tpe, id);
    1233             :                         break;
    1234             :                 default:
    1235             :                         err = LOG_ERR;
    1236             :                 }
    1237           0 :                 if (name)
    1238           0 :                         GDKfree(name);
    1239           0 :                 if (tr == (trans *) -1) {
    1240             :                         err = LOG_ERR;
    1241             :                         tr = NULL;
    1242             :                         break;
    1243             :                 }
    1244             :         }
    1245           0 :         logger_close(lg);
    1246             : 
    1247             :         /* remaining transactions are not committed, ie abort */
    1248           0 :         while (tr)
    1249           0 :                 tr = tr_abort(lg, tr);
    1250           0 :         if (lg->lg->debug & 1) {
    1251           0 :                 printf("# Finished reading the write-ahead log '%s'\n", filename);
    1252           0 :                 fflush(stdout);
    1253             :         }
    1254           0 :         ATOMIC_SET(&GDKdebug, dbg);
    1255             :         /* we cannot distinguish errors from incomplete transactions
    1256             :          * (even if we would log aborts in the logs). So we simply
    1257             :          * abort and move to the next log file */
    1258           0 :         return err == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
    1259             : }
    1260             : 
    1261             : /*
    1262             :  * The log files are incrementally numbered, starting from 2. They are
    1263             :  * processed in the same sequence.
    1264             :  */
    1265             : static gdk_return
    1266           0 : logger_readlogs(old_logger *lg, FILE *fp, char *filename)
    1267             : {
    1268           0 :         gdk_return res = GDK_SUCCEED;
    1269           0 :         char id[BUFSIZ];
    1270             : 
    1271           0 :         if (lg->lg->debug & 1) {
    1272           0 :                 fprintf(stderr, "#logger_readlogs logger id is " LLFMT "\n", lg->id);
    1273             :         }
    1274             : 
    1275           0 :         if (fgets(id, sizeof(id), fp) != NULL) {
    1276           0 :                 char log_filename[FILENAME_MAX];
    1277           0 :                 lng lid = strtoll(id, NULL, 10);
    1278             : 
    1279           0 :                 if (lg->lg->debug & 1) {
    1280           0 :                         fprintf(stderr, "#logger_readlogs last logger id written in %s is " LLFMT "\n", filename, lid);
    1281             :                 }
    1282             : 
    1283           0 :                 if (lid >= lg->id) {
    1284           0 :                         bool filemissing = false;
    1285             : 
    1286           0 :                         lg->id = lid;
    1287           0 :                         while (res == GDK_SUCCEED && !filemissing) {
    1288           0 :                                 if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
    1289           0 :                                         GDKerror("Logger filename path is too large\n");
    1290           0 :                                         return GDK_FAIL;
    1291             :                                 }
    1292           0 :                                 res = logger_readlog(lg, log_filename, &filemissing);
    1293           0 :                                 if (!filemissing)
    1294           0 :                                         lg->id++;
    1295             :                         }
    1296             :                 } else {
    1297           0 :                         bool filemissing = false;
    1298           0 :                         while (lid >= lg->id && res == GDK_SUCCEED) {
    1299             :                                 if (snprintf(log_filename, sizeof(log_filename), "%s." LLFMT, filename, lg->id) >= FILENAME_MAX) {
    1300             :                                         GDKerror("Logger filename path is too large\n");
    1301             :                                         return GDK_FAIL;
    1302             :                                 }
    1303             :                                 res = logger_readlog(lg, log_filename, &filemissing);
    1304             :                                 /* Increment the id only at the end,
    1305             :                                  * since we want to re-read the last
    1306             :                                  * file.  That is because last time we
    1307             :                                  * read it, it was empty, since the
    1308             :                                  * logger creates empty files and
    1309             :                                  * fills them in later. */
    1310             :                                 lg->id++;
    1311             :                         }
    1312           0 :                         if (lid < lg->id) {
    1313           0 :                                 lg->id = lid;
    1314             :                         }
    1315             :                 }
    1316             :         }
    1317             :         return res;
    1318             : }
    1319             : 
    1320             : static gdk_return
    1321           0 : check_version(old_logger *lg, FILE *fp, int version)
    1322             : {
    1323             :         /* if these were equal we wouldn't have gotten here */
    1324           0 :         assert(version != lg->lg->version);
    1325             : 
    1326           0 :         if (lg->lg->prefuncp == NULL ||
    1327           0 :             (*lg->lg->prefuncp)(lg->lg->funcdata, version, lg->lg->version) != GDK_SUCCEED) {
    1328           0 :                 GDKerror("Incompatible database version %06d, "
    1329             :                          "this server supports version %06d.\n%s",
    1330             :                          version, lg->lg->version,
    1331             :                          version < lg->lg->version ? "Maybe you need to upgrade to an intermediate release first.\n" : "");
    1332           0 :                 return GDK_FAIL;
    1333             :         }
    1334             : 
    1335           0 :         if (fgetc(fp) != '\n' ||         /* skip \n */
    1336           0 :             fgetc(fp) != '\n') {         /* skip \n */
    1337           0 :                 GDKerror("Badly formatted log file");
    1338           0 :                 return GDK_FAIL;
    1339             :         }
    1340             :         return GDK_SUCCEED;
    1341             : }
    1342             : 
    1343             : /* Load data from the logger logdir
    1344             :  * Initialize new directories and catalog files if none are present,
    1345             :  * unless running in read-only mode
    1346             :  * Load data and persist it in the BATs */
    1347             : static gdk_return
    1348           0 : logger_load(const char *fn, char filename[FILENAME_MAX], old_logger *lg, FILE *fp, int version)
    1349             : {
    1350           0 :         size_t len;
    1351           0 :         char bak[FILENAME_MAX];
    1352           0 :         str filenamestr = NULL;
    1353           0 :         log_bid snapshots_bid = 0;
    1354           0 :         bat catalog_bid, catalog_nme, catalog_tpe, catalog_oid, dcatalog;
    1355           0 :         ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
    1356             : 
    1357           0 :         assert(!LOG_DISABLED(lg));
    1358             : 
    1359           0 :         if ((filenamestr = GDKfilepath(0, lg->lg->dir, LOGFILE, NULL)) == NULL)
    1360           0 :                 goto error;
    1361           0 :         len = strcpy_len(filename, filenamestr, FILENAME_MAX);
    1362           0 :         GDKfree(filenamestr);
    1363           0 :         if (len >= FILENAME_MAX) {
    1364           0 :                 GDKerror("Logger filename path is too large\n");
    1365           0 :                 goto error;
    1366             :         }
    1367             : 
    1368           0 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_bid", NULL);
    1369           0 :         catalog_bid = BBPindex(bak);
    1370             : 
    1371           0 :         assert(catalog_bid != 0); /* has been checked by new logger */
    1372             : 
    1373             :         /* find the persistent catalog. As non persistent bats
    1374             :          * require a logical reference we also add a logical
    1375             :          * reference for the persistent bats */
    1376           0 :         size_t i;
    1377           0 :         BUN p, q;
    1378           0 :         BAT *b, *n, *t, *o, *d;
    1379             : 
    1380           0 :         b = BATdescriptor(catalog_bid);
    1381           0 :         if (b == NULL) {
    1382           0 :                 GDKerror("inconsistent database, catalog does not exist");
    1383           0 :                 goto error;
    1384             :         }
    1385             : 
    1386           0 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_nme", NULL);
    1387           0 :         catalog_nme = BBPindex(bak);
    1388           0 :         n = BATdescriptor(catalog_nme);
    1389           0 :         if (n == NULL) {
    1390           0 :                 BBPunfix(b->batCacheid);
    1391           0 :                 GDKerror("inconsistent database, catalog_nme does not exist");
    1392           0 :                 goto error;
    1393             :         }
    1394             : 
    1395           0 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_tpe", NULL);
    1396           0 :         catalog_tpe = BBPindex(bak);
    1397           0 :         t = BATdescriptor(catalog_tpe);
    1398           0 :         if (t == NULL) {
    1399           0 :                 t = logbat_new(TYPE_bte, BATSIZE, SYSTRANS);
    1400           0 :                 if (t == NULL
    1401           0 :                     ||BBPrename(t, bak) < 0) {
    1402           0 :                         BBPunfix(b->batCacheid);
    1403           0 :                         BBPunfix(n->batCacheid);
    1404           0 :                         BBPreclaim(t);
    1405           0 :                         GDKerror("inconsistent database, catalog_tpe does not exist");
    1406           0 :                         goto error;
    1407             :                 }
    1408           0 :                 for(i=0;i<BATcount(n); i++) {
    1409           0 :                         char zero = 0;
    1410           0 :                         if (BUNappend(t, &zero, false) != GDK_SUCCEED)
    1411           0 :                                 goto error;
    1412             :                 }
    1413           0 :                 lg->with_ids = false;
    1414             :         } else {
    1415           0 :                 if (BUNappend(lg->del, &t->batCacheid, false) != GDK_SUCCEED)
    1416           0 :                         goto error;
    1417           0 :                 BBPretain(t->batCacheid);
    1418             :         }
    1419             : 
    1420           0 :         strconcat_len(bak, sizeof(bak), fn, "_catalog_oid", NULL);
    1421           0 :         catalog_oid = BBPindex(bak);
    1422           0 :         o = BATdescriptor(catalog_oid);
    1423           0 :         if (o == NULL) {
    1424           0 :                 o = logbat_new(TYPE_lng, BATSIZE, SYSTRANS);
    1425           0 :                 if (o == NULL
    1426           0 :                     || BBPrename(o, bak) < 0) {
    1427           0 :                         BBPunfix(b->batCacheid);
    1428           0 :                         BBPunfix(n->batCacheid);
    1429           0 :                         BBPunfix(t->batCacheid);
    1430           0 :                         BBPreclaim(o);
    1431           0 :                         GDKerror("inconsistent database, catalog_oid does not exist");
    1432           0 :                         goto error;
    1433             :                 }
    1434           0 :                 for(i=0;i<BATcount(n); i++) {
    1435           0 :                         lng zero = 0;
    1436           0 :                         if (BUNappend(o, &zero, false) != GDK_SUCCEED)
    1437           0 :                                 goto error;
    1438             :                 }
    1439           0 :                 lg->with_ids = false;
    1440             :         } else {
    1441           0 :                 if (BUNappend(lg->del, &o->batCacheid, false) != GDK_SUCCEED)
    1442           0 :                         goto error;
    1443           0 :                 BBPretain(o->batCacheid);
    1444             :         }
    1445             : 
    1446           0 :         strconcat_len(bak, sizeof(bak), fn, "_dcatalog", NULL);
    1447           0 :         dcatalog = BBPindex(bak);
    1448           0 :         d = BATdescriptor(dcatalog);
    1449           0 :         if (d == NULL) {
    1450             :                 /* older database: create dcatalog and convert
    1451             :                  * catalog_bid and catalog_nme to
    1452             :                  * dense-headed */
    1453           0 :                 d = logbat_new(TYPE_oid, BATSIZE, SYSTRANS);
    1454           0 :                 if (d == NULL) {
    1455           0 :                         GDKerror("Logger_new: cannot create dcatalog bat");
    1456           0 :                         BBPunfix(b->batCacheid);
    1457           0 :                         BBPunfix(n->batCacheid);
    1458           0 :                         BBPunfix(t->batCacheid);
    1459           0 :                         BBPunfix(o->batCacheid);
    1460           0 :                         goto error;
    1461             :                 }
    1462           0 :                 if (BBPrename(d, bak) < 0) {
    1463           0 :                         BBPunfix(b->batCacheid);
    1464           0 :                         BBPunfix(n->batCacheid);
    1465           0 :                         BBPunfix(t->batCacheid);
    1466           0 :                         BBPunfix(o->batCacheid);
    1467           0 :                         goto error;
    1468             :                 }
    1469             :         } else {
    1470           0 :                 if (BUNappend(lg->del, &d->batCacheid, false) != GDK_SUCCEED)
    1471           0 :                         goto error;
    1472           0 :                 BBPretain(d->batCacheid);
    1473             :         }
    1474             : 
    1475           0 :         if ((lg->catalog_bid = BATsetaccess(b, BAT_READ)) == NULL ||
    1476           0 :             (lg->catalog_nme = BATsetaccess(n, BAT_READ)) == NULL ||
    1477           0 :             (lg->catalog_tpe = BATsetaccess(t, BAT_READ)) == NULL ||
    1478           0 :             (lg->catalog_oid = BATsetaccess(o, BAT_READ)) == NULL ||
    1479           0 :             (lg->dcatalog = BATsetaccess(d, BAT_READ)) == NULL)
    1480           0 :                 goto error;
    1481           0 :         if (BUNappend(lg->del, &b->batCacheid, false) != GDK_SUCCEED)
    1482           0 :                 goto error;
    1483           0 :         BBPretain(b->batCacheid);
    1484           0 :         if (BUNappend(lg->del, &n->batCacheid, false) != GDK_SUCCEED)
    1485           0 :                 goto error;
    1486           0 :         BBPretain(n->batCacheid);
    1487             : 
    1488           0 :         const log_bid *bids;
    1489           0 :         bids = (const log_bid *) Tloc(b, 0);
    1490           0 :         BATloop(b, p, q) {
    1491           0 :                 bat bid = bids[p];
    1492           0 :                 oid pos = p;
    1493             : 
    1494           0 :                 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE &&
    1495           0 :                     BBPretain(bid) == 0 &&
    1496           0 :                     BUNappend(lg->dcatalog, &pos, true) != GDK_SUCCEED)
    1497           0 :                         goto error;
    1498             :         }
    1499             : 
    1500           0 :         lg->freed = logbat_new(TYPE_int, 1, SYSTRANS);
    1501           0 :         if (lg->freed == NULL) {
    1502           0 :                 GDKerror("Logger_new: failed to create freed bat");
    1503           0 :                 goto error;
    1504             :         }
    1505           0 :         snapshots_bid = old_logger_find_bat(lg, "snapshots_bid", 0, 0);
    1506           0 :         if (snapshots_bid < 0)
    1507           0 :                 goto error;
    1508           0 :         if (snapshots_bid == 0) {
    1509           0 :                 lg->snapshots_bid = logbat_new(TYPE_int, 1, SYSTRANS);
    1510           0 :                 lg->snapshots_tid = logbat_new(TYPE_int, 1, SYSTRANS);
    1511           0 :                 lg->dsnapshots = logbat_new(TYPE_oid, 1, SYSTRANS);
    1512           0 :                 if (lg->snapshots_bid == NULL ||
    1513           0 :                     lg->snapshots_tid == NULL ||
    1514             :                     lg->dsnapshots == NULL) {
    1515           0 :                         GDKerror("Logger_new: failed to create snapshots bats");
    1516           0 :                         goto error;
    1517             :                 }
    1518             :         } else {
    1519           0 :                 bat snapshots_tid = old_logger_find_bat(lg, "snapshots_tid", 0, 0);
    1520           0 :                 bat dsnapshots = old_logger_find_bat(lg, "dsnapshots", 0, 0);
    1521             : 
    1522           0 :                 if (snapshots_tid < 0 || dsnapshots < 0)
    1523           0 :                         goto error;
    1524           0 :                 ATOMIC_AND(&GDKdebug, ~CHECKMASK);
    1525           0 :                 lg->snapshots_bid = BATdescriptor(snapshots_bid);
    1526           0 :                 if (lg->snapshots_bid == NULL) {
    1527           0 :                         GDKerror("inconsistent database, snapshots_bid does not exist");
    1528           0 :                         goto error;
    1529             :                 }
    1530           0 :                 lg->snapshots_tid = BATdescriptor(snapshots_tid);
    1531           0 :                 if (lg->snapshots_tid == NULL) {
    1532           0 :                         GDKerror("inconsistent database, snapshots_tid does not exist");
    1533           0 :                         goto error;
    1534             :                 }
    1535           0 :                 ATOMIC_SET(&GDKdebug, dbg);
    1536             : 
    1537           0 :                 if (dsnapshots) {
    1538           0 :                         lg->dsnapshots = BATdescriptor(dsnapshots);
    1539           0 :                         if (lg->dsnapshots == NULL) {
    1540           0 :                                 GDKerror("Logger_new: inconsistent database, snapshots_tid does not exist");
    1541           0 :                                 goto error;
    1542             :                         }
    1543           0 :                         if (BUNappend(lg->del, &dsnapshots, false) != GDK_SUCCEED)
    1544           0 :                                 goto error;
    1545           0 :                         BBPretain(dsnapshots);
    1546             :                 } else {
    1547           0 :                         lg->dsnapshots = logbat_new(TYPE_oid, 1, SYSTRANS);
    1548           0 :                         if (lg->dsnapshots == NULL) {
    1549           0 :                                 GDKerror("Logger_new: cannot create dsnapshots bat");
    1550           0 :                                 goto error;
    1551             :                         }
    1552             :                 }
    1553           0 :                 if (BUNappend(lg->del, &snapshots_bid, false) != GDK_SUCCEED)
    1554           0 :                         goto error;
    1555           0 :                 BBPretain(snapshots_bid);
    1556           0 :                 if (BUNappend(lg->del, &snapshots_tid, false) != GDK_SUCCEED)
    1557           0 :                         goto error;
    1558           0 :                 BBPretain(snapshots_tid);
    1559             :         }
    1560           0 :         if ((lg->snapshots_bid = BATsetaccess(lg->snapshots_bid, BAT_READ)) == NULL ||
    1561           0 :             (lg->snapshots_tid = BATsetaccess(lg->snapshots_tid, BAT_READ)) == NULL ||
    1562           0 :             (lg->dsnapshots = BATsetaccess(lg->dsnapshots, BAT_READ)) == NULL) {
    1563           0 :                 goto error;
    1564             :         }
    1565           0 :         strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
    1566           0 :         if (BBPindex(bak)) {
    1567           0 :                 lg->seqs_id = BATdescriptor(BBPindex(bak));
    1568           0 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
    1569           0 :                 lg->seqs_val = BATdescriptor(BBPindex(bak));
    1570           0 :                 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
    1571           0 :                 lg->dseqs = BATdescriptor(BBPindex(bak));
    1572             :         } else {
    1573           0 :                 lg->seqs_id = logbat_new(TYPE_int, 1, PERSISTENT);
    1574           0 :                 lg->seqs_val = logbat_new(TYPE_lng, 1, PERSISTENT);
    1575           0 :                 lg->dseqs = logbat_new(TYPE_oid, 1, PERSISTENT);
    1576           0 :                 if (lg->seqs_id == NULL ||
    1577           0 :                     lg->seqs_val == NULL ||
    1578             :                     lg->dseqs == NULL) {
    1579           0 :                         GDKerror("Logger_new: cannot create seqs bats");
    1580           0 :                         goto error;
    1581             :                 }
    1582             : 
    1583           0 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_id", NULL);
    1584           0 :                 if (BBPrename(lg->seqs_id, bak) < 0) {
    1585           0 :                         goto error;
    1586             :                 }
    1587             : 
    1588           0 :                 strconcat_len(bak, sizeof(bak), fn, "_seqs_val", NULL);
    1589           0 :                 if (BBPrename(lg->seqs_val, bak) < 0) {
    1590           0 :                         goto error;
    1591             :                 }
    1592             : 
    1593           0 :                 strconcat_len(bak, sizeof(bak), fn, "_dseqs", NULL);
    1594           0 :                 if (BBPrename(lg->dseqs, bak) < 0) {
    1595           0 :                         goto error;
    1596             :                 }
    1597           0 :                 if (BUNappend(lg->add, &lg->seqs_id->batCacheid, false) != GDK_SUCCEED)
    1598           0 :                         goto error;
    1599           0 :                 BBPretain(lg->seqs_id->batCacheid);
    1600           0 :                 if (BUNappend(lg->add, &lg->seqs_val->batCacheid, false) != GDK_SUCCEED)
    1601           0 :                         goto error;
    1602           0 :                 BBPretain(lg->seqs_val->batCacheid);
    1603           0 :                 if (BUNappend(lg->add, &lg->dseqs->batCacheid, false) != GDK_SUCCEED)
    1604           0 :                         goto error;
    1605           0 :                 BBPretain(lg->dseqs->batCacheid);
    1606             :         }
    1607           0 :         if ((lg->seqs_val = BATsetaccess(lg->seqs_val, BAT_READ)) == NULL ||
    1608           0 :             (lg->seqs_id = BATsetaccess(lg->seqs_id, BAT_READ)) == NULL ||
    1609           0 :             (lg->dseqs = BATsetaccess(lg->dseqs, BAT_READ)) == NULL) {
    1610           0 :                 goto error;
    1611             :         }
    1612             : 
    1613           0 :         if (check_version(lg, fp, version) != GDK_SUCCEED) {
    1614           0 :                 goto error;
    1615             :         }
    1616             : 
    1617           0 :         if (logger_readlogs(lg, fp, filename) != GDK_SUCCEED) {
    1618           0 :                 goto error;
    1619             :         }
    1620           0 :         fclose(fp);
    1621           0 :         fp = NULL;
    1622             : 
    1623           0 :         if (lg->lg->postfuncp &&
    1624           0 :             (*lg->lg->postfuncp)(lg->lg->funcdata, lg) != GDK_SUCCEED)
    1625           0 :                 goto error;
    1626           0 :         lg->lg->postfuncp = NULL; /* not again */
    1627             : 
    1628           0 :         if (BUNappend(lg->add, &lg->lg->catalog_bid->batCacheid, false) != GDK_SUCCEED)
    1629           0 :                 goto error;
    1630           0 :         BBPretain(lg->lg->catalog_bid->batCacheid);
    1631           0 :         if (BUNappend(lg->add, &lg->lg->catalog_id->batCacheid, false) != GDK_SUCCEED)
    1632           0 :                 goto error;
    1633           0 :         BBPretain(lg->lg->catalog_id->batCacheid);
    1634           0 :         if (BUNappend(lg->add, &lg->lg->dcatalog->batCacheid, false) != GDK_SUCCEED)
    1635           0 :                 goto error;
    1636           0 :         BBPretain(lg->lg->dcatalog->batCacheid);
    1637             : 
    1638           0 :         return GDK_SUCCEED;
    1639           0 :   error:
    1640           0 :         if (fp)
    1641           0 :                 fclose(fp);
    1642           0 :         logbat_destroy(lg->catalog_bid);
    1643           0 :         logbat_destroy(lg->catalog_nme);
    1644           0 :         logbat_destroy(lg->catalog_tpe);
    1645           0 :         logbat_destroy(lg->catalog_oid);
    1646           0 :         logbat_destroy(lg->dcatalog);
    1647           0 :         logbat_destroy(lg->snapshots_bid);
    1648           0 :         logbat_destroy(lg->snapshots_tid);
    1649           0 :         logbat_destroy(lg->dsnapshots);
    1650           0 :         logbat_destroy(lg->freed);
    1651           0 :         logbat_destroy(lg->seqs_id);
    1652           0 :         logbat_destroy(lg->seqs_val);
    1653           0 :         logbat_destroy(lg->dseqs);
    1654           0 :         bids = (const log_bid *) Tloc(lg->add, 0);
    1655           0 :         BATloop(lg->add, p, q) {
    1656           0 :                 BBPrelease(bids[p]);
    1657             :         }
    1658           0 :         logbat_destroy(lg->add);
    1659           0 :         bids = (const log_bid *) Tloc(lg->del, 0);
    1660           0 :         BATloop(lg->del, p, q) {
    1661           0 :                 BBPrelease(bids[p]);
    1662             :         }
    1663           0 :         logbat_destroy(lg->del);
    1664           0 :         GDKfree(lg);
    1665           0 :         return GDK_FAIL;
    1666             : }
    1667             : 
    1668             : /* Initialize a new logger
    1669             :  * It will load any data in the logdir and persist it in the BATs*/
    1670             : static old_logger *
    1671           0 : logger_new(logger *lg, const char *fn, const char *logdir, FILE *fp, int version, const char *logfile)
    1672             : {
    1673           0 :         old_logger *old_lg;
    1674           0 :         char filename[FILENAME_MAX];
    1675             : 
    1676           0 :         assert(!GDKinmemory(0));
    1677           0 :         assert(lg != NULL);
    1678           0 :         if (GDKinmemory(0)) {
    1679           0 :                 TRC_CRITICAL(GDK, "old logger can only be used with a disk-based database\n");
    1680           0 :                 fclose(fp);
    1681           0 :                 return NULL;
    1682             :         }
    1683           0 :         if (MT_path_absolute(logdir)) {
    1684           0 :                 TRC_CRITICAL(GDK, "logdir must be relative path\n");
    1685           0 :                 fclose(fp);
    1686           0 :                 return NULL;
    1687             :         }
    1688             : 
    1689           0 :         old_lg = GDKmalloc(sizeof(struct old_logger));
    1690           0 :         if (old_lg == NULL) {
    1691           0 :                 TRC_CRITICAL(GDK, "allocating logger structure failed\n");
    1692           0 :                 fclose(fp);
    1693           0 :                 return NULL;
    1694             :         }
    1695             : 
    1696           0 :         *old_lg = (struct old_logger) {
    1697             :                 .lg = lg,
    1698             :                 .filename = logfile,
    1699             :                 .with_ids = true,
    1700             :                 .id = 1,
    1701             :         };
    1702             : 
    1703           0 :         old_lg->add = COLnew(0, TYPE_int, 0, SYSTRANS);
    1704           0 :         old_lg->del = COLnew(0, TYPE_int, 0, SYSTRANS);
    1705           0 :         if (old_lg->add == NULL || old_lg->del == NULL) {
    1706           0 :                 TRC_CRITICAL(GDK, "cannot allocate temporary bats\n");
    1707           0 :                 goto bailout;
    1708             :         }
    1709             : 
    1710           0 :         if (snprintf(filename, sizeof(filename), "%s%c%s%c", logdir, DIR_SEP, fn, DIR_SEP) >= FILENAME_MAX) {
    1711           0 :                 TRC_CRITICAL(GDK, "filename is too large\n");
    1712           0 :                 goto bailout;
    1713             :         }
    1714           0 :         if (old_lg->lg->debug & 1) {
    1715           0 :                 fprintf(stderr, "#logger_new dir set to %s\n", old_lg->lg->dir);
    1716             :         }
    1717             : 
    1718           0 :         if (logger_load(fn, filename, old_lg, fp, version) == GDK_SUCCEED) {
    1719             :                 return old_lg;
    1720             :         }
    1721             :         return NULL;
    1722             : 
    1723           0 :   bailout:
    1724           0 :         logbat_destroy(old_lg->add);
    1725           0 :         logbat_destroy(old_lg->del);
    1726           0 :         GDKfree(old_lg);
    1727           0 :         fclose(fp);
    1728           0 :         return NULL;
    1729             : }
    1730             : 
    1731             : static gdk_return
    1732           0 : old_logger_destroy(old_logger *lg)
    1733             : {
    1734           0 :         BUN p, q;
    1735           0 :         BAT *b = NULL;
    1736           0 :         const log_bid *bids;
    1737           0 :         gdk_return rc;
    1738             : 
    1739           0 :         bat *subcommit = GDKmalloc(sizeof(log_bid) * (BATcount(lg->add) + BATcount(lg->del) + 1));
    1740           0 :         if (subcommit == NULL) {
    1741           0 :                 TRC_CRITICAL(GDK, "logger_destroy failed\n");
    1742           0 :                 return GDK_FAIL;
    1743             :         }
    1744           0 :         int i = 0;
    1745           0 :         subcommit[i++] = 0;
    1746             : 
    1747           0 :         bids = (const log_bid *) Tloc(lg->add, 0);
    1748           0 :         BATloop(lg->add, p, q) {
    1749           0 :                 b = BATdescriptor(bids[p]);
    1750           0 :                 if (b) {
    1751           0 :                         b = BATsetaccess(b, BAT_READ);
    1752           0 :                         BATmode(b, false);
    1753           0 :                         BBPunfix(bids[p]);
    1754             :                 }
    1755           0 :                 subcommit[i++] = bids[p];
    1756             :         }
    1757           0 :         bids = (const log_bid *) Tloc(lg->del, 0);
    1758           0 :         BATloop(lg->del, p, q) {
    1759           0 :                 b = BATdescriptor(bids[p]);
    1760           0 :                 if (b) {
    1761           0 :                         BATmode(b, true);
    1762           0 :                         BBPunfix(bids[p]);
    1763             :                 }
    1764           0 :                 subcommit[i++] = bids[p];
    1765             :         }
    1766             :         /* give the catalog bats names so we can find them
    1767             :          * next time */
    1768           0 :         char bak[IDLENGTH];
    1769           0 :         if (BBPrename(lg->catalog_bid, NULL) < 0 ||
    1770           0 :             BBPrename(lg->catalog_nme, NULL) < 0 ||
    1771           0 :             BBPrename(lg->catalog_tpe, NULL) < 0 ||
    1772           0 :             BBPrename(lg->catalog_oid, NULL) < 0 ||
    1773           0 :             BBPrename(lg->dcatalog, NULL) < 0 ||
    1774           0 :             BBPrename(lg->snapshots_bid, NULL) < 0 ||
    1775           0 :             BBPrename(lg->snapshots_tid, NULL) < 0 ||
    1776           0 :             BBPrename(lg->dsnapshots, NULL) < 0 ||
    1777           0 :             strconcat_len(bak, sizeof(bak), lg->lg->fn, "_catalog_bid", NULL) >= sizeof(bak) ||
    1778           0 :             BBPrename(lg->lg->catalog_bid, bak) < 0 ||
    1779           0 :             strconcat_len(bak, sizeof(bak), lg->lg->fn, "_catalog_id", NULL) >= sizeof(bak) ||
    1780           0 :             BBPrename(lg->lg->catalog_id, bak) < 0 ||
    1781           0 :             strconcat_len(bak, sizeof(bak), lg->lg->fn, "_dcatalog", NULL) >= sizeof(bak) ||
    1782           0 :             BBPrename(lg->lg->dcatalog, bak) < 0) {
    1783           0 :                 GDKfree(subcommit);
    1784           0 :                 return GDK_FAIL;
    1785             :         }
    1786           0 :         if ((rc = GDKmove(0, lg->lg->dir, LOGFILE, NULL, lg->lg->dir, LOGFILE, "bak", true)) != GDK_SUCCEED) {
    1787           0 :                 TRC_CRITICAL(GDK, "logger_destroy failed\n");
    1788           0 :                 GDKfree(subcommit);
    1789           0 :                 return rc;
    1790             :         }
    1791           0 :         if ((rc = log_create_types_file(lg->lg, lg->filename, true)) != GDK_SUCCEED) {
    1792           0 :                 TRC_CRITICAL(GDK, "logger_destroy failed\n");
    1793           0 :                 GDKfree(subcommit);
    1794           0 :                 return rc;
    1795             :         }
    1796           0 :         lg->lg->id = (ulng) lg->id;
    1797           0 :         lg->lg->saved_id = lg->lg->id;
    1798           0 :         rc = TMsubcommit_list(subcommit, NULL, i, lg->lg->saved_id, lg->lg->saved_tid);
    1799           0 :         GDKfree(subcommit);
    1800           0 :         if (rc != GDK_SUCCEED) {
    1801           0 :                 TRC_CRITICAL(GDK, "logger_destroy failed\n");
    1802           0 :                 return rc;
    1803             :         }
    1804           0 :         snprintf(bak, sizeof(bak), "bak-" LLFMT, lg->id);
    1805           0 :         if ((rc = GDKmove(0, lg->lg->dir, LOGFILE, "bak", lg->lg->dir, LOGFILE, bak, true)) != GDK_SUCCEED) {
    1806           0 :                 TRC_CRITICAL(GDK, "logger_destroy failed\n");
    1807           0 :                 return rc;
    1808             :         }
    1809             : 
    1810           0 :         if (logger_cleanup(lg) != GDK_SUCCEED)
    1811           0 :                 TRC_CRITICAL(GDK, "logger_cleanup failed\n");
    1812             : 
    1813             :         /* free resources */
    1814           0 :         bids = (const log_bid *) Tloc(lg->catalog_bid, 0);
    1815           0 :         BATloop(lg->catalog_bid, p, q) {
    1816           0 :                 bat bid = bids[p];
    1817           0 :                 oid pos = p;
    1818             : 
    1819           0 :                 if (BUNfnd(lg->dcatalog, &pos) == BUN_NONE)
    1820           0 :                         BBPrelease(bid);
    1821             :         }
    1822           0 :         bids = (const log_bid *) Tloc(lg->add, 0);
    1823           0 :         BATloop(lg->add, p, q) {
    1824           0 :                 BBPrelease(bids[p]);
    1825             :         }
    1826           0 :         logbat_destroy(lg->add);
    1827           0 :         bids = (const log_bid *) Tloc(lg->del, 0);
    1828           0 :         BATloop(lg->del, p, q) {
    1829           0 :                 BBPrelease(bids[p]);
    1830             :         }
    1831           0 :         logbat_destroy(lg->del);
    1832             : 
    1833           0 :         logbat_destroy(lg->catalog_bid);
    1834           0 :         logbat_destroy(lg->catalog_nme);
    1835           0 :         logbat_destroy(lg->catalog_tpe);
    1836           0 :         logbat_destroy(lg->catalog_oid);
    1837           0 :         logbat_destroy(lg->dcatalog);
    1838           0 :         logbat_destroy(lg->freed);
    1839           0 :         logbat_destroy(lg->seqs_id);
    1840           0 :         logbat_destroy(lg->seqs_val);
    1841           0 :         logbat_destroy(lg->dseqs);
    1842           0 :         logbat_destroy(lg->snapshots_bid);
    1843           0 :         logbat_destroy(lg->snapshots_tid);
    1844           0 :         logbat_destroy(lg->dsnapshots);
    1845             : 
    1846           0 :         GDKfree(lg);
    1847           0 :         return GDK_SUCCEED;
    1848             : }
    1849             : 
    1850             : /* Create a new logger */
    1851             : gdk_return
    1852           0 : old_logger_load(logger *lg, const char *fn, const char *logdir, FILE *fp, int version, const char *filename)
    1853             : {
    1854           0 :         old_logger *old_lg;
    1855           0 :         old_lg = logger_new(lg, fn, logdir, fp, version, filename);
    1856           0 :         if (old_lg == NULL)
    1857             :                 return GDK_FAIL;
    1858           0 :         old_logger_destroy(old_lg);
    1859           0 :         return GDK_SUCCEED;
    1860             : }
    1861             : 
    1862             : /* Clean-up write-ahead log files already persisted in the BATs.
    1863             :  * Update the LOGFILE and delete all bak- files as well.
    1864             :  */
    1865             : static gdk_return
    1866           0 : logger_cleanup(old_logger *lg)
    1867             : {
    1868           0 :         char buf[BUFSIZ];
    1869           0 :         FILE *fp = NULL;
    1870             : 
    1871           0 :         if (LOG_DISABLED(lg))
    1872             :                 return GDK_SUCCEED;
    1873             : 
    1874           0 :         if (snprintf(buf, sizeof(buf), "%s%s.bak-" LLFMT, lg->lg->dir, LOGFILE, lg->id) >= (int) sizeof(buf)) {
    1875           0 :                 TRC_CRITICAL(GDK, "filename is too large\n");
    1876           0 :                 return GDK_FAIL;
    1877             :         }
    1878             : 
    1879           0 :         if (lg->lg->debug & 1) {
    1880           0 :                 fprintf(stderr, "#logger_cleanup %s\n", buf);
    1881             :         }
    1882             : 
    1883           0 :         lng lid = lg->id;
    1884             :         // remove the last persisted WAL files as well to reduce the
    1885             :         // work for the logger_cleanup_old()
    1886           0 :         if ((fp = GDKfileopen(0, NULL, buf, NULL, "r")) == NULL) {
    1887           0 :                 GDKsyserror("cannot open file %s\n", buf);
    1888           0 :                 return GDK_FAIL;
    1889             :         }
    1890             : 
    1891           0 :         while (lid-- > 0) {
    1892           0 :                 char log_id[FILENAME_MAX];
    1893             : 
    1894           0 :                 if (snprintf(log_id, sizeof(log_id), LLFMT, lid) >= (int) sizeof(log_id)) {
    1895           0 :                         TRC_CRITICAL(GDK, "log_id filename is too large\n");
    1896           0 :                         fclose(fp);
    1897           0 :                         return GDK_FAIL;
    1898             :                 }
    1899           0 :                 if (GDKunlink(0, lg->lg->dir, LOGFILE, log_id) != GDK_SUCCEED) {
    1900             :                         /* not a disaster (yet?) if unlink fails */
    1901           0 :                         TRC_ERROR(GDK, "failed to remove old WAL %s.%s\n", LOGFILE, buf);
    1902           0 :                         GDKclrerr();
    1903             :                 }
    1904             :         }
    1905           0 :         fclose(fp);
    1906             : 
    1907           0 :         if (snprintf(buf, sizeof(buf), "bak-" LLFMT, lg->id) >= (int) sizeof(buf)) {
    1908           0 :                 TRC_CRITICAL(GDK, "filename is too large\n");
    1909           0 :                 GDKclrerr();
    1910             :         }
    1911             : 
    1912           0 :         if (GDKunlink(0, lg->lg->dir, LOGFILE, buf) != GDK_SUCCEED) {
    1913             :                 /* not a disaster (yet?) if unlink fails */
    1914           0 :                 TRC_ERROR(GDK, "failed to remove old WAL %s.%s\n", LOGFILE, buf);
    1915           0 :                 GDKclrerr();
    1916             :         }
    1917             : 
    1918             :         return GDK_SUCCEED;
    1919             : }
    1920             : 
    1921             : static gdk_return
    1922           0 : logger_add_bat(old_logger *lg, BAT *b, const char *name, char tpe, oid id)
    1923             : {
    1924           0 :         log_bid bid = old_logger_find_bat(lg, name, tpe, id);
    1925           0 :         lng lid = tpe ? (lng) id : 0;
    1926             : 
    1927           0 :         assert(b->batRestricted != BAT_WRITE);
    1928           0 :         assert(b->batRole == PERSISTENT);
    1929           0 :         if (bid < 0)
    1930             :                 return GDK_FAIL;
    1931           0 :         if (bid) {
    1932           0 :                 if (bid != b->batCacheid) {
    1933           0 :                         if (logger_del_bat(lg, bid) != GDK_SUCCEED)
    1934             :                                 return GDK_FAIL;
    1935             :                 } else {
    1936             :                         return GDK_SUCCEED;
    1937             :                 }
    1938             :         }
    1939           0 :         bid = b->batCacheid;
    1940           0 :         if (lg->lg->debug & 1)
    1941           0 :                 fprintf(stderr, "#create %s\n", NAME(name, tpe, id));
    1942           0 :         assert(log_find(lg->catalog_bid, lg->dcatalog, bid) == BUN_NONE);
    1943           0 :         lg->changes += BATcount(b) + 1000;
    1944           0 :         if (BUNappend(lg->catalog_bid, &bid, true) != GDK_SUCCEED ||
    1945           0 :             BUNappend(lg->catalog_nme, name, true) != GDK_SUCCEED ||
    1946           0 :             BUNappend(lg->catalog_tpe, &tpe, true) != GDK_SUCCEED ||
    1947           0 :             BUNappend(lg->catalog_oid, &lid, true) != GDK_SUCCEED)
    1948           0 :                 return GDK_FAIL;
    1949           0 :         BBPretain(bid);
    1950           0 :         return GDK_SUCCEED;
    1951             : }
    1952             : 
    1953             : 
    1954             : static gdk_return
    1955           0 : logger_del_bat(old_logger *lg, log_bid bid)
    1956             : {
    1957           0 :         BAT *b = BATdescriptor(bid);
    1958           0 :         BUN p = log_find(lg->catalog_bid, lg->dcatalog, bid), q;
    1959           0 :         oid pos;
    1960             : 
    1961           0 :         assert(p != BUN_NONE);
    1962           0 :         if (p == BUN_NONE) {
    1963             :                 logbat_destroy(b);
    1964             :                 GDKerror("cannot find BAT\n");
    1965             :                 return GDK_FAIL;
    1966             :         }
    1967             : 
    1968             :         /* if this is a not logger commited snapshot bat, make it
    1969             :          * transient */
    1970           0 :         if (p >= lg->catalog_bid->batInserted &&
    1971           0 :             (q = log_find(lg->snapshots_bid, lg->dsnapshots, bid)) != BUN_NONE) {
    1972           0 :                 pos = (oid) q;
    1973           0 :                 if (BUNappend(lg->dsnapshots, &pos, true) != GDK_SUCCEED) {
    1974           0 :                         logbat_destroy(b);
    1975           0 :                         return GDK_FAIL;
    1976             :                 }
    1977           0 :                 if (lg->lg->debug & 1)
    1978           0 :                         fprintf(stderr,
    1979             :                                 "#logger_del_bat release snapshot %d (%d)\n",
    1980           0 :                                 bid, BBP_lrefs(bid));
    1981           0 :                 if (BUNappend(lg->freed, &bid, false) != GDK_SUCCEED) {
    1982           0 :                         logbat_destroy(b);
    1983           0 :                         return GDK_FAIL;
    1984             :                 }
    1985           0 :         } else if (p >= lg->catalog_bid->batInserted) {
    1986           0 :                 BBPrelease(bid);
    1987             :         } else {
    1988           0 :                 if (BUNappend(lg->freed, &bid, false) != GDK_SUCCEED) {
    1989           0 :                         logbat_destroy(b);
    1990           0 :                         return GDK_FAIL;
    1991             :                 }
    1992             :         }
    1993           0 :         if (b) {
    1994           0 :                 lg->changes += BATcount(b) + 1;
    1995           0 :                 BBPunfix(b->batCacheid);
    1996             :         }
    1997           0 :         pos = (oid) p;
    1998           0 :         return BUNappend(lg->dcatalog, &pos, true);
    1999             : /*assert(BBP_lrefs(bid) == 0);*/
    2000             : }

Generated by: LCOV version 1.14