LCOV - code coverage report
Current view: top level - sql/storage/bat - bat_storage.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 2473 3160 78.3 %
Date: 2024-04-25 23:25:41 Functions: 156 163 95.7 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "bat_storage.h"
      15             : #include "bat_utils.h"
      16             : #include "sql_string.h"
      17             : #include "gdk_atoms.h"
      18             : #include "gdk_atoms.h"
      19             : #include "matomic.h"
      20             : 
      21             : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
      22             : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
      23             : 
      24             : static int log_update_col( sql_trans *tr, sql_change *c);
      25             : static int log_update_idx( sql_trans *tr, sql_change *c);
      26             : static int log_update_del( sql_trans *tr, sql_change *c);
      27             : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      28             : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      29             : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      30             : static int log_create_col(sql_trans *tr, sql_change *change);
      31             : static int log_create_idx(sql_trans *tr, sql_change *change);
      32             : static int log_create_del(sql_trans *tr, sql_change *change);
      33             : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      34             : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      35             : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      36             : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
      37             : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
      38             : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
      39             : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
      40             : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
      41             : 
      42             : static void merge_delta( sql_delta *obat);
      43             : 
      44             : /* valid
      45             :  * !deleted && VALID_4_READ(TS, tr)                             existing or newly created segment
      46             :  *  deleted && TS > tr->ts && OLDTS < tr->ts                deleted after current transaction
      47             :  */
      48             : 
      49             : #define VALID_4_READ(TS,tr) \
      50             :         (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
      51             : 
      52             : /* when changed, check if the old status is still valid */
      53             : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
      54             :                 (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
      55             : 
      56             : #define SEG_VALID_4_DELETE(seg,tr) \
      57             :         (!seg->deleted && VALID_4_READ(seg->ts, tr))
      58             : 
      59             : /* Delete (in current trans or by some other finised transaction, or re-used segment which used to be deleted */
      60             : #define SEG_IS_DELETED(seg,tr) \
      61             :         ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
      62             :          (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
      63             : 
      64             : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
      65             : #define SEG_IS_VALID(seg, tr) \
      66             :                 ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
      67             :                  (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
      68             : 
      69             : static inline BAT *
      70        5251 : transfer_to_systrans(BAT *b)
      71             : {
      72             :         /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
      73        5251 :         MT_lock_set(&b->theaplock);
      74        5251 :         if (VIEWtparent(b) || VIEWvtparent(b)) {
      75          19 :                 MT_lock_unset(&b->theaplock);
      76          19 :                 BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
      77          19 :                 BBPreclaim(b);
      78          19 :                 return bn;
      79             :         }
      80        5232 :         if (b->theap->farmid == TRANSIENT ||
      81          14 :                 (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
      82        4839 :                 QryCtx *qc = MT_thread_get_qry_ctx();
      83        4839 :                 if (qc) {
      84        2463 :                         if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
      85        2463 :                                 ATOMIC_SUB(&qc->datasize, b->theap->size);
      86        2463 :                                 b->theap->farmid = SYSTRANS;
      87        2463 :                                 b->batRole = SYSTRANS;
      88             :                         }
      89        2463 :                         if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
      90        1068 :                                 ATOMIC_SUB(&qc->datasize, b->tvheap->size);
      91        1068 :                                 b->tvheap->farmid = SYSTRANS;
      92             :                         }
      93             :                 }
      94             :         }
      95        5232 :         MT_lock_unset(&b->theaplock);
      96        5232 :         return b;
      97             : }
      98             : 
      99             : static void
     100    25064188 : lock_table(sqlstore *store, sqlid id)
     101             : {
     102    25064188 :         MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     103    25067545 : }
     104             : 
     105             : static void
     106    25067388 : unlock_table(sqlstore *store, sqlid id)
     107             : {
     108    25067388 :         MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     109    25067507 : }
     110             : 
     111             : static void
     112    20069435 : lock_column(sqlstore *store, sqlid id)
     113             : {
     114    20069435 :         MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     115    20068904 : }
     116             : 
     117             : static void
     118    20069330 : unlock_column(sqlstore *store, sqlid id)
     119             : {
     120    20069330 :         MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     121    20068936 : }
     122             : 
     123             : static void
     124      112982 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     125             : {
     126      112982 :         assert(cleanup);
     127      112982 :         trans_add(tr, dup_base(b), data, cleanup, commit, log);
     128      112982 : }
     129             : 
     130             : static void
     131      132948 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     132             : {
     133      132948 :         assert(cleanup);
     134      132948 :         dup_base(&t->base);
     135      132947 :         trans_add(tr, b, data, cleanup, commit, log);
     136      132940 : }
     137             : 
     138             : static int
     139       79827 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
     140             : {
     141       79827 :         segment *s = change->data;
     142             : 
     143       79827 :         if (s->ts <= oldest) {
     144       33141 :                 while(s) {
     145       21076 :                         segment *n = s->prev;
     146       21076 :                         ATOMIC_PTR_DESTROY(&s->next);
     147       21076 :                         _DELETE(s);
     148       21076 :                         s = n;
     149             :                 }
     150       12065 :                 sqlstore *store = Store;
     151       12065 :                 table_destroy(store, (sql_table*)change->obj);
     152       12065 :                 return 1;
     153             :         }
     154             :         return LOG_OK;
     155             : }
     156             : 
     157             : static void
     158       21076 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
     159             : {
     160             :         /* we can only be accessed by anything older then commit_ts */
     161       21076 :         if (c->cleanup == &tc_gc_seg)
     162        9011 :                 s->prev = c->data;
     163             :         else
     164       12065 :                 c->cleanup = &tc_gc_seg;
     165       21076 :         c->data = s;
     166       21076 :         s->ts = commit_ts;
     167       16676 : }
     168             : 
     169             : static segment *
     170       87424 : new_segment(segment *o, sql_trans *tr, size_t cnt)
     171             : {
     172       87424 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     173             : 
     174       87424 :         assert(tr);
     175       87424 :         if (n) {
     176       87424 :                 *n = (segment) {
     177       87424 :                         .ts = tr->tid,
     178             :                         .oldts = 0,
     179             :                         .deleted = false,
     180             :                         .start = 0,
     181             :                         .end = cnt,
     182             :                         .next = ATOMIC_PTR_VAR_INIT(NULL),
     183             :                         .prev = NULL,
     184             :                 };
     185       87424 :                 if (o) {
     186       36064 :                         n->start += o->end;
     187       36064 :                         n->end += o->end;
     188       36064 :                         ATOMIC_PTR_SET(&o->next, n);
     189             :                 }
     190             :         }
     191       87424 :         return n;
     192             : }
     193             : 
     194             : static segment *
     195       90265 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
     196             : {
     197       90265 :         assert(tr);
     198       90265 :         if (o->start == start && o->end == start+cnt) {
     199       10558 :                 assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
     200       10558 :                 o->oldts = o->ts;
     201       10558 :                 o->ts = tr->tid;
     202       10558 :                 o->deleted = deleted;
     203       10558 :                 return o;
     204             :         }
     205       79707 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     206             : 
     207       79707 :         if (!n)
     208             :                 return NULL;
     209       79707 :         n->prev = NULL;
     210             : 
     211       79707 :         if (o->ts == tr->tid) {
     212        4020 :                 n->oldts = 0;
     213        4020 :                 n->ts = 1;
     214        4020 :                 n->deleted = true;
     215             :         } else {
     216       75687 :                 n->oldts = o->ts;
     217       75687 :                 n->ts = tr->tid;
     218       75687 :                 n->deleted = deleted;
     219             :         }
     220       79707 :         if (start == o->start) {
     221             :                 /* 2-way split: o remains latter part of segment, new one is
     222             :                  * inserted before */
     223       64742 :                 n->start = o->start;
     224       64742 :                 n->end = n->start + cnt;
     225       64742 :                 ATOMIC_PTR_INIT(&n->next, o);
     226       64742 :                 if (segs->h == o)
     227         461 :                         segs->h = n;
     228       64742 :                 if (p)
     229       64281 :                         ATOMIC_PTR_SET(&p->next, n);
     230       64742 :                 o->start = n->end;
     231       14965 :         } else if (start+cnt == o->end) {
     232             :                 /* 2-way split: o remains first part of segment, new one is
     233             :                  * added after */
     234        5462 :                 n->start = o->end - cnt;
     235        5462 :                 n->end = o->end;
     236        5462 :                 ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
     237        5462 :                 ATOMIC_PTR_SET(&o->next, n);
     238        5462 :                 if (segs->t == o)
     239         812 :                         segs->t = n;
     240        5462 :                 o->end = n->start;
     241             :         } else {
     242             :                 /* 3-way split: o remains first part of segment, two new ones
     243             :                  * are added after */
     244        9503 :                 segment *n2 = GDKmalloc(sizeof(segment));
     245        9503 :                 if (n2 == NULL) {
     246           0 :                         GDKfree(n);
     247           0 :                         return NULL;
     248             :                 }
     249        9503 :                 ATOMIC_PTR_INIT(&n->next, n2);
     250        9503 :                 n->start = start;
     251        9503 :                 n->end = start + cnt;
     252        9503 :                 *n2 = *o;
     253        9503 :                 ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
     254        9503 :                 n2->start = n->end;
     255        9503 :                 n2->prev = NULL;
     256        9503 :                 if (segs->t == o)
     257        3495 :                         segs->t = n2;
     258        9503 :                 ATOMIC_PTR_SET(&o->next, n);
     259        9503 :                 o->end = start;
     260             :         }
     261             :         return n;
     262             : }
     263             : 
     264             : static void
     265        4188 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
     266             : {
     267        4188 :         segment *cur = segs->h, *seg = NULL;
     268       18144 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     269       13956 :                 if (cur->ts == tr->tid) { /* revert */
     270        4685 :                         cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
     271        4685 :                         cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
     272        4685 :                         cur->oldts = 0;
     273             :                 }
     274       13956 :                 if (cur->ts <= oldest) { /* possibly merge range */
     275       13070 :                         if (!seg) { /* skip first */
     276             :                                 seg = cur;
     277        8882 :                         } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
     278             :                                 /* merge with previous */
     279        4400 :                                 seg->end = cur->end;
     280        4400 :                                 ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     281        4400 :                                 if (cur == segs->t)
     282        2849 :                                         segs->t = seg;
     283        4400 :                                 mark4destroy(cur, change, store_get_timestamp(tr->store));
     284        4400 :                                 cur = seg;
     285             :                         } else {
     286             :                                 seg = cur; /* begin of new merge */
     287             :                         }
     288             :                 }
     289             :         }
     290        4188 : }
     291             : 
     292             : static size_t
     293      102784 : segs_end_include_deleted( segments *segs, sql_trans *tr)
     294             : {
     295      102784 :         size_t cnt = 0;
     296      102784 :         segment *s = segs->h, *l = NULL;
     297             : 
     298      509545 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     299      406761 :                 if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
     300             :                                 l = s;
     301             :         }
     302      102784 :         if (l)
     303      102777 :                 cnt = l->end;
     304      102784 :         return cnt;
     305             : }
     306             : 
     307             : static int
     308      102784 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
     309             : {
     310             :         /* set bits correctly */
     311      102784 :         BAT *b = temp_descriptor(cs->bid);
     312             : 
     313      102784 :         if (!b)
     314             :                 return LOG_ERR;
     315      102784 :         segment *s = segs->h;
     316             : 
     317      102784 :         size_t nr = segs_end_include_deleted(segs, tr);
     318      102784 :         size_t rounded_nr = ((nr+31)&~31);
     319      102784 :         if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
     320           0 :                 bat_destroy(b);
     321           0 :                 return LOG_ERR;
     322             :         }
     323             : 
     324             :         /* disable all properties here */
     325      102784 :         MT_lock_set(&b->theaplock);
     326      102784 :         b->tsorted = false;
     327      102784 :         b->trevsorted = false;
     328      102784 :         b->tnosorted = 0;
     329      102784 :         b->tnorevsorted = 0;
     330      102784 :         b->tseqbase = oid_nil;
     331      102784 :         b->tkey = false;
     332      102784 :         b->tnokey[0] = 0;
     333      102784 :         b->tnokey[1] = 0;
     334      102784 :         b->theap->dirty = true;
     335      102784 :         BUN cnt = BATcount(b);
     336      102784 :         MT_lock_unset(&b->theaplock);
     337             : 
     338      102784 :         uint32_t *restrict dst;
     339             :         /* why hashlock ?? */
     340      102784 :         MT_rwlock_wrlock(&b->thashlock);
     341      541429 :         for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
     342      380513 :                 if (s->start >= nr)
     343             :                         break;
     344      335861 :                 if (s->ts == tr->tid && s->end != s->start) {
     345      147469 :                         if (cnt < s->start) { /* first mark as deleted ! */
     346        3829 :                                 size_t lnr = s->start-cnt;
     347        3829 :                                 size_t pos = cnt;
     348        3829 :                                 dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     349        3829 :                                 uint32_t cur = 0;
     350             : 
     351        3829 :                                 size_t used = pos&31, end = 32;
     352        3829 :                                 if (used) {
     353        3702 :                                         if (lnr < (32-used))
     354        3504 :                                                 end = used + lnr;
     355        3702 :                                         assert(end > used);
     356        3702 :                                         cur |= ((1U << (end - used)) - 1) << used;
     357        3702 :                                         lnr -= end - used;
     358        3702 :                                         *dst++ |= cur;
     359        3702 :                                         cur = 0;
     360             :                                 }
     361        3829 :                                 size_t full = lnr/32;
     362        3829 :                                 size_t rest = lnr%32;
     363        3829 :                                 if (full > 0) {
     364           8 :                                         memset(dst, ~0, full * sizeof(*dst));
     365           8 :                                         dst += full;
     366           8 :                                         lnr -= full * 32;
     367             :                                 }
     368        3829 :                                 if (rest > 0) {
     369         210 :                                         cur |= (1U << rest) - 1;
     370         210 :                                         lnr -= rest;
     371         210 :                                         *dst |= cur;
     372             :                                 }
     373        3829 :                                 assert(lnr==0);
     374             :                         }
     375      147469 :                         size_t lnr = s->end-s->start;
     376      147469 :                         size_t pos = s->start;
     377      147469 :                         dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     378      147469 :                         uint32_t cur = 0;
     379      147469 :                         size_t used = pos&31, end = 32;
     380      147469 :                         if (used) {
     381      109525 :                                 if (lnr < (32-used))
     382      103469 :                                         end = used + lnr;
     383      109525 :                                 assert(end > used);
     384      109525 :                                 cur |= ((1U << (end - used)) - 1) << used;
     385      109525 :                                 lnr -= end - used;
     386      109525 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     387      109525 :                                 dst++;
     388      109525 :                                 cur = 0;
     389             :                         }
     390      147469 :                         size_t full = lnr/32;
     391      147469 :                         size_t rest = lnr%32;
     392      147469 :                         if (full > 0) {
     393        3588 :                                 memset(dst, s->deleted?~0:0, full * sizeof(*dst));
     394        3588 :                                 dst += full;
     395        3588 :                                 lnr -= full * 32;
     396             :                         }
     397      147469 :                         if (rest > 0) {
     398       40924 :                                 cur |= (1U << rest) - 1;
     399       40924 :                                 lnr -= rest;
     400       40924 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     401             :                         }
     402      147469 :                         assert(lnr==0);
     403      147469 :                         if (cnt < s->end)
     404      335861 :                                 cnt = s->end;
     405             :                 }
     406             :         }
     407      102784 :         MT_rwlock_wrunlock(&b->thashlock);
     408      102784 :         if (nr > BATcount(b)) {
     409       60674 :                 MT_lock_set(&b->theaplock);
     410       60674 :                 BATsetcount(b, nr);
     411       60674 :                 MT_lock_unset(&b->theaplock);
     412             :         }
     413             : 
     414      102784 :         bat_destroy(b);
     415      102784 :         return LOG_OK;
     416             : }
     417             : 
     418             : /* TODO return LOG_OK/ERR */
     419             : static void
     420      102810 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     421             : {
     422      102810 :         sqlstore* store = tr->store;
     423      102810 :         segment *cur = s->segs->h, *seg = NULL;
     424      509643 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     425      406833 :                 if (cur->ts == tr->tid) {
     426      161642 :                         if (!cur->deleted)
     427       91561 :                                 cur->oldts = 0;
     428      161642 :                         cur->ts = commit_ts;
     429             :                 }
     430      406833 :                 if (!seg) {
     431             :                         /* first segment */
     432             :                         seg = cur;
     433             :                 }
     434      304023 :                 else if (seg->ts < TRANSACTION_ID_BASE) {
     435             :                         /* possible merge since both deleted flags are equal */
     436      274621 :                         if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
     437      204708 :                                 int merge = 1;
     438      204708 :                                 node *n = store->active->h;
     439      657610 :                                 for (int i = 0; i < store->active->cnt; i++, n = n->next) {
     440      558063 :                                         sql_trans* other = ((sql_trans*)n->data);
     441      558063 :                                         ulng active = other->ts;
     442      558063 :                                         if(other->active == 2)
     443       23077 :                                                 continue; /* pretend that another recently committed transaction is no longer active */
     444      534986 :                                         if (active == tr->ts)
     445      137423 :                                                 continue; /* pretend that committing transaction has already committed and is no longer active */
     446      397563 :                                         if (seg->ts < active && cur->ts < active)
     447             :                                                 break;
     448      384590 :                                         if (seg->ts > active && cur->ts > active)
     449      292402 :                                                 continue;
     450             : 
     451       92188 :                                         assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
     452             :                                         /* cannot safely merge since there is an active transaction between the segments */
     453             :                                         merge = false;
     454             :                                         break;
     455             :                                 }
     456             :                                 /* merge segments */
     457      225040 :                                 if (merge) {
     458      112520 :                                         seg->end = cur->end;
     459      112520 :                                         ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     460      112520 :                                         if (cur == s->segs->t)
     461       26533 :                                                 s->segs->t = seg;
     462      112520 :                                         if (commit_ts == oldest) {
     463       95844 :                                                 ATOMIC_PTR_DESTROY(&cur->next);
     464       95844 :                                                 _DELETE(cur);
     465             :                                         } else
     466       33352 :                                                 mark4destroy(cur, change, commit_ts);
     467      112520 :                                         cur = seg;
     468      112520 :                                         continue;
     469             :                                 }
     470             :                         }
     471             :                 }
     472             :                 seg = cur;
     473             :         }
     474      102810 : }
     475             : 
     476             : static int
     477     2167121 : segments_in_transaction(sql_trans *tr, sql_table *t)
     478             : {
     479     2167121 :         storage *s = ATOMIC_PTR_GET(&t->data);
     480     2167121 :         segment *seg = s->segs->h;
     481             : 
     482     2167121 :         if (seg && s->segs->t->ts == tr->tid)
     483             :                 return 1;
     484      683700 :         for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
     485      579483 :                 if (seg->ts == tr->tid)
     486             :                         return 1;
     487             :         }
     488             :         return 0;
     489             : }
     490             : 
     491             : static size_t
     492    17294105 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
     493             : {
     494    17294105 :         size_t cnt = 0;
     495             : 
     496    17294105 :         lock_table(tr->store, table->base.id);
     497    17297103 :         segment *s = segs->h, *l = NULL;
     498             : 
     499    17297103 :         if (segs->t && SEG_IS_VALID(segs->t, tr))
     500    15079108 :                 l = s = segs->t;
     501             : 
     502   211025937 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     503   193728976 :                 if (SEG_IS_VALID(s, tr))
     504             :                                 l = s;
     505             :         }
     506    17296961 :         if (l)
     507    17278663 :                 cnt = l->end;
     508    17296961 :         unlock_table(tr->store, table->base.id);
     509    17297031 :         return cnt;
     510             : }
     511             : 
     512             : static segments *
     513       51360 : new_segments(sql_trans *tr, size_t cnt)
     514             : {
     515       51360 :         segments *n = (segments*)GDKmalloc(sizeof(segments));
     516             : 
     517       51360 :         if (n) {
     518       51360 :                 n->h = n->t = new_segment(NULL, tr, cnt);
     519       51360 :                 if (!n->h) {
     520           0 :                         GDKfree(n);
     521           0 :                         return NULL;
     522             :                 }
     523       51360 :                 sql_ref_init(&n->r);
     524             :         }
     525             :         return n;
     526             : }
     527             : 
     528             : static sql_delta *
     529    30331033 : timestamp_delta( sql_trans *tr, sql_delta *d)
     530             : {
     531    30378713 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     532       47680 :                 d = d->next;
     533    30331141 :         return d;
     534             : }
     535             : 
     536             : static sql_delta *
     537    30176802 : col_timestamp_delta( sql_trans *tr, sql_column *c)
     538             : {
     539    30176802 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
     540             : }
     541             : 
     542             : static sql_delta *
     543       21614 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
     544             : {
     545       21614 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
     546             : }
     547             : 
     548             : static storage *
     549    18302681 : timestamp_storage( sql_trans *tr, storage *d)
     550             : {
     551    18302681 :         if (!d)
     552             :                 return NULL;
     553    18366589 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     554       63908 :                 d = d->next;
     555             :         return d;
     556             : }
     557             : 
     558             : static storage *
     559    18278005 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
     560             : {
     561    18278005 :         return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
     562             : }
     563             : 
     564             : static sql_delta*
     565       19347 : delta_dup(sql_delta *d)
     566             : {
     567       19347 :         ATOMIC_INC(&d->cs.refcnt);
     568       19347 :         return d;
     569             : }
     570             : 
     571             : static void *
     572       17953 : col_dup(sql_column *c)
     573             : {
     574       17953 :         return delta_dup(ATOMIC_PTR_GET(&c->data));
     575             : }
     576             : 
     577             : static void *
     578        2681 : idx_dup(sql_idx *i)
     579             : {
     580        2681 :         if (!ATOMIC_PTR_GET(&i->data))
     581             :                 return NULL;
     582        1394 :         return delta_dup(ATOMIC_PTR_GET(&i->data));
     583             : }
     584             : 
     585             : static storage*
     586        1531 : storage_dup(storage *d)
     587             : {
     588        1531 :         ATOMIC_INC(&d->cs.refcnt);
     589        1531 :         return d;
     590             : }
     591             : 
     592             : static void *
     593        1531 : del_dup(sql_table *t)
     594             : {
     595        1531 :         return storage_dup(ATOMIC_PTR_GET(&t->data));
     596             : }
     597             : 
     598             : static size_t
     599          17 : count_inserts( segment *s, sql_trans *tr)
     600             : {
     601          17 :         size_t cnt = 0;
     602             : 
     603          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     604          55 :                 if (!s->deleted && s->ts == tr->tid)
     605           4 :                         cnt += s->end - s->start;
     606             :         }
     607          17 :         return cnt;
     608             : }
     609             : 
     610             : static size_t
     611      852152 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
     612             : {
     613      852152 :         size_t cnt = 0;
     614             : 
     615      997409 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
     616             :                 ;
     617             : 
     618     4949818 :         for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
     619     4097665 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
     620     1325531 :                         cnt += s->end - s->start;
     621             :         }
     622      852153 :         return cnt;
     623             : }
     624             : 
     625             : static size_t
     626          17 : count_deletes( segment *s, sql_trans *tr)
     627             : {
     628          17 :         size_t cnt = 0;
     629             : 
     630          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     631          55 :                 if (SEG_IS_DELETED(s, tr))
     632          17 :                         cnt += s->end - s->start;
     633             :         }
     634          17 :         return cnt;
     635             : }
     636             : 
     637             : #define CNT_ACTIVE 10
     638             : 
     639             : static size_t
     640    16782724 : count_col(sql_trans *tr, sql_column *c, int access)
     641             : {
     642    16782724 :         storage *d;
     643    16782724 :         sql_delta *ds;
     644             : 
     645    16782724 :         if (!isTable(c->t))
     646             :                 return 0;
     647    16782724 :         d = tab_timestamp_storage(tr, c->t);
     648    16780636 :         ds = col_timestamp_delta(tr, c);
     649    16780854 :         if (!d ||!ds)
     650             :                 return 0;
     651    16780854 :         if (access == 2)
     652      458324 :                 return ds?ds->cs.ucnt:0;
     653    16322530 :         if (access == 1)
     654          17 :                 return count_inserts(d->segs->h, tr);
     655    16322513 :         if (access == QUICK)
     656      533328 :                 return d->segs->t?d->segs->t->end:0;
     657    15789185 :         if (access == CNT_ACTIVE) {
     658      852121 :                 size_t cnt = segs_end(d->segs, tr, c->t);
     659      852156 :                 lock_table(tr->store, c->t->base.id);
     660      852151 :                 cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
     661      852152 :                 unlock_table(tr->store, c->t->base.id);
     662      852152 :                 return cnt;
     663             :         }
     664    14937064 :         return segs_end(d->segs, tr, c->t);
     665             : }
     666             : 
     667             : static size_t
     668       18831 : count_idx(sql_trans *tr, sql_idx *i, int access)
     669             : {
     670       18831 :         storage *d;
     671       18831 :         sql_delta *ds;
     672             : 
     673       18831 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
     674        4631 :                 return 0;
     675       14200 :         d = tab_timestamp_storage(tr, i->t);
     676       14198 :         ds = idx_timestamp_delta(tr, i);
     677       14199 :         if (!d || !ds)
     678             :                 return 0;
     679       14199 :         if (access == 2)
     680        2855 :                 return ds?ds->cs.ucnt:0;
     681       11344 :         if (access == 1)
     682           0 :                 return count_inserts(d->segs->h, tr);
     683       11344 :         if (access == QUICK)
     684        3528 :                 return d->segs->t?d->segs->t->end:0;
     685        7816 :         return segs_end(d->segs, tr, i->t);
     686             : }
     687             : 
     688             : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
     689             : static BAT *
     690    13346029 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
     691             : {
     692    13346029 :         BAT *b;
     693             : 
     694    13346029 :         assert(access == RD_UPD_ID || access == RD_UPD_VAL);
     695             :         /* returns the updates for cs */
     696    13346029 :         if (cs->uibid && cs->uvbid && cs->ucnt) {
     697        7670 :                 if (access == RD_UPD_ID) {
     698        4942 :                         if (!(b = temp_descriptor(cs->uibid)))
     699             :                                 return NULL;
     700        4942 :                         if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
     701         912 :                            (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
     702        4030 :                                         oid nil = oid_nil;
     703             :                                         /* less then cnt */
     704        4030 :                                         BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false);
     705        4030 :                                         if (!s) {
     706           0 :                                                 bat_destroy(b);
     707           0 :                                                 return NULL;
     708             :                                         }
     709             : 
     710        4030 :                                         BAT *nb = BATproject(s, b);
     711        4030 :                                         bat_destroy(s);
     712        4030 :                                         bat_destroy(b);
     713        4030 :                                         b = nb;
     714             :                         }
     715             :                 } else {
     716        2728 :                         b = temp_descriptor(cs->uvbid);
     717             :                 }
     718             :         } else {
     719    22230553 :                 b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
     720             :         }
     721             :         return b;
     722             : }
     723             : 
     724             : static BAT *
     725           0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
     726             : {
     727           0 :         int err = 0;
     728           0 :         BAT *uv = *UV;
     729           0 :         BUN cnt = BATcount(ui)+BATcount(oi);
     730           0 :         BATiter uvi;
     731           0 :         BATiter ovi;
     732             : 
     733           0 :         if (uv) {
     734           0 :                 uvi = bat_iterator(uv);
     735           0 :                 ovi = bat_iterator(ov);
     736             :         }
     737             : 
     738             :         /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
     739           0 :         BUN uip = 0, uie = BATcount(ui);
     740           0 :         BUN oip = 0, oie = BATcount(oi);
     741             : 
     742           0 :         oid uiseqb = ui->tseqbase;
     743           0 :         oid oiseqb = oi->tseqbase;
     744           0 :         oid *uipt = NULL, *oipt = NULL;
     745           0 :         BATiter uii = bat_iterator(ui);
     746           0 :         BATiter oii = bat_iterator(oi);
     747           0 :         if (!BATtdensebi(&uii))
     748           0 :                 uipt = uii.base;
     749           0 :         if (!BATtdensebi(&oii))
     750           0 :                 oipt = oii.base;
     751             : 
     752           0 :         if (uiseqb == oiseqb && uie == oie) { /* full overlap, no values */
     753           0 :                 if (uv) {
     754           0 :                         bat_iterator_end(&uvi);
     755           0 :                         bat_iterator_end(&ovi);
     756             :                 }
     757           0 :                 bat_iterator_end(&uii);
     758           0 :                 bat_iterator_end(&oii);
     759           0 :                 if (uv) {
     760           0 :                         *UV = uv;
     761             :                 } else {
     762           0 :                         bat_destroy(uv);
     763             :                 }
     764           0 :                 bat_destroy(oi);
     765           0 :                 bat_destroy(ov);
     766           0 :                 return ui;
     767             :         }
     768           0 :         BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
     769           0 :         BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
     770             : 
     771           0 :         if (!ni || (uv && !nv)) {
     772           0 :                 bat_destroy(ni);
     773           0 :                 bat_destroy(nv);
     774           0 :                 bat_destroy(ui);
     775           0 :                 bat_destroy(uv);
     776           0 :                 bat_destroy(oi);
     777           0 :                 bat_destroy(ov);
     778           0 :                 return NULL;
     779             :         }
     780           0 :         while (uip < uie && oip < oie && !err) {
     781           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     782           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     783             : 
     784           0 :                 if (uiid <= oiid) {
     785           0 :                         if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     786           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     787             :                                 err = 1;
     788           0 :                         uip++;
     789           0 :                         if (uiid == oiid)
     790           0 :                                 oip++;
     791             :                 } else { /* uiid > oiid */
     792           0 :                         if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     793           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     794             :                                 err = 1;
     795           0 :                         oip++;
     796             :                 }
     797             :         }
     798           0 :         while (uip < uie && !err) {
     799           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     800           0 :                 if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     801           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     802             :                         err = 1;
     803           0 :                 uip++;
     804             :         }
     805           0 :         while (oip < oie && !err) {
     806           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     807           0 :                 if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     808           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     809             :                         err = 1;
     810           0 :                 oip++;
     811             :         }
     812           0 :         if (uv) {
     813           0 :                 bat_iterator_end(&uvi);
     814           0 :                 bat_iterator_end(&ovi);
     815             :         }
     816           0 :         bat_iterator_end(&uii);
     817           0 :         bat_iterator_end(&oii);
     818           0 :         bat_destroy(ui);
     819           0 :         bat_destroy(uv);
     820           0 :         bat_destroy(oi);
     821           0 :         bat_destroy(ov);
     822           0 :         if (!err) {
     823           0 :                 if (nv)
     824           0 :                         *UV = nv;
     825           0 :                 return ni;
     826             :         }
     827           0 :         *UV = NULL;
     828           0 :         bat_destroy(ni);
     829           0 :         bat_destroy(nv);
     830           0 :         return NULL;
     831             : }
     832             : 
     833             : static sql_delta *
     834     8897355 : older_delta( sql_delta *d, sql_trans *tr)
     835             : {
     836     8897355 :         sql_delta *o = d->next;
     837             : 
     838     8902513 :         while (o && !o->cs.merged) {
     839        5153 :                 if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     840             :                         break;
     841             :                 else
     842        5158 :                         o = o->next;
     843             :         }
     844     8897360 :         if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     845           0 :                 return o;
     846             :         return NULL;
     847             : }
     848             : 
     849             : static BAT *
     850     8897194 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
     851             : {
     852     8897194 :         assert(tr->active);
     853     8897194 :         sql_delta *o = NULL;
     854     8897194 :         BAT *ui = NULL, *uv = NULL;
     855             : 
     856     8897194 :         if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
     857             :                 return NULL;
     858     8897291 :         if (access == RD_UPD_VAL) {
     859     4449071 :                 if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
     860           0 :                         bat_destroy(ui);
     861           0 :                         return NULL;
     862             :                 }
     863             :         }
     864     8897351 :         while ((o = older_delta(d, tr)) != NULL) {
     865           0 :                 BAT *oui = NULL, *ouv = NULL;
     866           0 :                 if (!oui)
     867           0 :                         oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
     868           0 :                 if (access == RD_UPD_VAL)
     869           0 :                         ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
     870           0 :                 if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
     871           0 :                         bat_destroy(ui);
     872           0 :                         bat_destroy(uv);
     873           0 :                         bat_destroy(oui);
     874           0 :                         bat_destroy(ouv);
     875           0 :                         return NULL;
     876             :                 }
     877           0 :                 if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
     878             :                         return NULL;
     879             :                 d = o;
     880             :         }
     881     8897341 :         if (uv) {
     882     4449129 :                 bat_destroy(ui);
     883     4449129 :                 return uv;
     884             :         }
     885             :         return ui;
     886             : }
     887             : 
     888             : static BAT *
     889         519 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
     890             : {
     891         519 :         lock_column(tr->store, c->base.id);
     892         519 :         sql_delta *d = col_timestamp_delta(tr, c);
     893         519 :         int type = c->type.type->localtype;
     894             : 
     895         519 :         if (!d) {
     896           0 :                 unlock_column(tr->store, c->base.id);
     897           0 :                 return NULL;
     898             :         }
     899         519 :         if (d->cs.st == ST_DICT) {
     900           0 :                 BAT *b = quick_descriptor(d->cs.bid);
     901             : 
     902           0 :                 type = b->ttype;
     903             :         }
     904         519 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     905         519 :         unlock_column(tr->store, c->base.id);
     906         519 :         return bn;
     907             : }
     908             : 
     909             : static BAT *
     910           0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
     911             : {
     912           0 :         lock_column(tr->store, i->base.id);
     913           0 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     914           0 :         sql_delta *d = idx_timestamp_delta(tr, i);
     915             : 
     916           0 :         if (!d) {
     917           0 :                 unlock_column(tr->store, i->base.id);
     918           0 :                 return NULL;
     919             :         }
     920           0 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     921           0 :         unlock_column(tr->store, i->base.id);
     922           0 :         return bn;
     923             : }
     924             : 
     925             : static BAT *
     926     8949327 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
     927             : {
     928     8949327 :         BAT *b;
     929             : 
     930     8949327 :         assert(access == RDONLY || access == QUICK || access == RD_EXT);
     931     8949327 :         assert(cs != NULL);
     932     8949327 :         if (access == QUICK)
     933      136113 :                 return quick_descriptor(cs->bid);
     934     8813214 :         if (access == RD_EXT)
     935         860 :                 return temp_descriptor(cs->ebid);
     936     8812354 :         assert(cs->bid);
     937     8812354 :         b = temp_descriptor(cs->bid);
     938     8812391 :         if (b == NULL)
     939             :                 return NULL;
     940     8812391 :         assert(b->batRestricted == BAT_READ);
     941             :         /* return slice */
     942     8812391 :         BAT *s = BATslice(b, 0, cnt);
     943     8812340 :         bat_destroy(b);
     944     8812340 :         return s;
     945             : }
     946             : 
     947             : static int
     948     4448247 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
     949             : {
     950     4448247 :         lock_column(tr->store, c->base.id);
     951     4448068 :         size_t cnt = count_col(tr, c, 0);
     952     4448409 :         sql_delta *d = col_timestamp_delta(tr, c);
     953     4448381 :         int type = c->type.type->localtype;
     954             : 
     955     4448381 :         if (!d) {
     956           0 :                 unlock_column(tr->store, c->base.id);
     957           0 :                 return LOG_ERR;
     958             :         }
     959     4448381 :         if (d->cs.st == ST_DICT) {
     960           2 :                 BAT *b = quick_descriptor(d->cs.bid);
     961             : 
     962           2 :                 type = b->ttype;
     963             :         }
     964             : 
     965     4448381 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     966     4448566 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     967             : 
     968     4448553 :         unlock_column(tr->store, c->base.id);
     969             : 
     970     4448520 :         if (*ui == NULL || *uv == NULL) {
     971           0 :                 bat_destroy(*ui);
     972           0 :                 bat_destroy(*uv);
     973           0 :                 return LOG_ERR;
     974             :         }
     975             :         return LOG_OK;
     976             : }
     977             : 
     978             : static int
     979          57 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
     980             : {
     981          57 :         lock_column(tr->store, i->base.id);
     982          57 :         size_t cnt = count_idx(tr, i, 0);
     983          57 :         sql_delta *d = idx_timestamp_delta(tr, i);
     984          57 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     985             : 
     986          57 :         if (!d) {
     987           0 :                 unlock_column(tr->store, i->base.id);
     988           0 :                 return LOG_ERR;
     989             :         }
     990             : 
     991          57 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     992          57 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     993             : 
     994          57 :         unlock_column(tr->store, i->base.id);
     995             : 
     996          57 :         if (*ui == NULL || *uv == NULL) {
     997           0 :                 bat_destroy(*ui);
     998           0 :                 bat_destroy(*uv);
     999           0 :                 return LOG_ERR;
    1000             :         }
    1001             :         return LOG_OK;
    1002             : }
    1003             : 
    1004             : static void *                                   /* BAT * */
    1005     8942681 : bind_col(sql_trans *tr, sql_column *c, int access)
    1006             : {
    1007     8942681 :         assert(access == QUICK || tr->active);
    1008     8942681 :         if (!isTable(c->t))
    1009             :                 return NULL;
    1010     8942681 :         sql_delta *d = col_timestamp_delta(tr, c);
    1011     8942462 :         if (!d)
    1012             :                 return NULL;
    1013     8942462 :         size_t cnt = count_col(tr, c, 0);
    1014     8942425 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1015         519 :                 return bind_ucol(tr, c, access, cnt);
    1016     8941906 :         BAT *b = cs_bind_bat( &d->cs, access, cnt);
    1017     8942055 :         assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
    1018             :         return b;
    1019             : }
    1020             : 
    1021             : static void *                                   /* BAT * */
    1022        7361 : bind_idx(sql_trans *tr, sql_idx * i, int access)
    1023             : {
    1024        7361 :         assert(access == QUICK || tr->active);
    1025        7361 :         if (!isTable(i->t))
    1026             :                 return NULL;
    1027        7361 :         sql_delta *d = idx_timestamp_delta(tr, i);
    1028        7361 :         if (!d)
    1029             :                 return NULL;
    1030        7361 :         size_t cnt = count_idx(tr, i, 0);
    1031        7361 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1032           0 :                 return bind_uidx(tr, i, access, cnt);
    1033        7361 :         return cs_bind_bat( &d->cs, access, cnt);
    1034             : }
    1035             : 
    1036             : static int
    1037        3935 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
    1038             : {
    1039        3935 :         if (!cs->uibid) {
    1040           0 :                 cs->uibid = e_bat(TYPE_oid);
    1041           0 :                 if (cs->uibid == BID_NIL)
    1042             :                         return LOG_ERR;
    1043             :         }
    1044        3935 :         if (!cs->uvbid) {
    1045           0 :                 BAT *cur = quick_descriptor(cs->bid);
    1046           0 :                 if (!cur)
    1047             :                         return LOG_ERR;
    1048           0 :                 int type = cur->ttype;
    1049           0 :                 cs->uvbid = e_bat(type);
    1050           0 :                 if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1051             :                         return LOG_ERR;
    1052             :         }
    1053        3935 :         BAT *ui = temp_descriptor(cs->uibid);
    1054        3935 :         BAT *uv = temp_descriptor(cs->uvbid);
    1055             : 
    1056        3935 :         if (ui == NULL || uv == NULL) {
    1057           0 :                 bat_destroy(ui);
    1058           0 :                 bat_destroy(uv);
    1059           0 :                 return LOG_ERR;
    1060             :         }
    1061        3935 :         assert(ui && uv);
    1062        3935 :         if (isEbat(ui)){
    1063         411 :                 temp_destroy(cs->uibid);
    1064         411 :                 cs->uibid = temp_copy(ui->batCacheid, true, true);
    1065         411 :                 bat_destroy(ui);
    1066         411 :                 if (cs->uibid == BID_NIL ||
    1067         411 :                     (ui = temp_descriptor(cs->uibid)) == NULL) {
    1068           0 :                         bat_destroy(uv);
    1069           0 :                         return LOG_ERR;
    1070             :                 }
    1071             :         }
    1072        3935 :         if (isEbat(uv)){
    1073         411 :                 temp_destroy(cs->uvbid);
    1074         411 :                 cs->uvbid = temp_copy(uv->batCacheid, true, true);
    1075         411 :                 bat_destroy(uv);
    1076         411 :                 if (cs->uvbid == BID_NIL ||
    1077         411 :                     (uv = temp_descriptor(cs->uvbid)) == NULL) {
    1078           0 :                         bat_destroy(ui);
    1079           0 :                         return LOG_ERR;
    1080             :                 }
    1081             :         }
    1082        3935 :         *Ui = ui;
    1083        3935 :         *Uv = uv;
    1084        3935 :         return LOG_OK;
    1085             : }
    1086             : 
    1087             : static int
    1088        6454 : segments_is_append(segment *s, sql_trans *tr, oid rid)
    1089             : {
    1090       93298 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1091       93298 :                 if (s->start <= rid && s->end > rid) {
    1092        6454 :                         if (s->ts == tr->tid && !s->deleted) {
    1093        2625 :                                 return 1;
    1094             :                         }
    1095             :                         break;
    1096             :                 }
    1097             :         }
    1098             :         return 0;
    1099             : }
    1100             : 
    1101             : static int
    1102        3829 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
    1103             : {
    1104       87998 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1105       87998 :                 if (s->start <= rid && s->end > rid) {
    1106        3829 :                         if (s->ts >= tr->ts && s->deleted) {
    1107           0 :                                 return 1;
    1108             :                         }
    1109             :                         break;
    1110             :                 }
    1111             :         }
    1112             :         return 0;
    1113             : }
    1114             : 
    1115             : static sql_delta *
    1116           0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
    1117             : {
    1118           0 :         sql_delta *n = ZNEW(sql_delta);
    1119           0 :         if (!n)
    1120             :                 return NULL;
    1121           0 :         *n = *bat;
    1122           0 :         n->next = NULL;
    1123           0 :         n->cs.ts = tr->tid;
    1124           0 :         return n;
    1125             : }
    1126             : 
    1127             : static BAT *
    1128          17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
    1129             : {
    1130          17 :         BAT *newoffsets = NULL;
    1131          17 :         sql_delta *bat = *batp;
    1132          17 :         column_storage *cs = &bat->cs;
    1133          17 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1134             : 
    1135          17 :         if (!u)
    1136             :                 return NULL;
    1137          17 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1138          17 :         if (DICTprepare4append(&newoffsets, i, u) < 0) {
    1139           0 :                 bat_destroy(u);
    1140           0 :                 return NULL;
    1141             :         } else {
    1142          17 :                 int new = 0;
    1143             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1144          17 :                 if (BATcount(u) >= max_cnt) {
    1145           1 :                         if (max_cnt == 64*1024) { /* decompress */
    1146           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1147           0 :                                         bat_destroy(u);
    1148           0 :                                         return NULL;
    1149             :                                 }
    1150           0 :                                 if (cs->ucnt) {
    1151           0 :                                         BAT *ui = NULL, *uv = NULL;
    1152           0 :                                         BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1153           0 :                                         bat_destroy(b);
    1154           0 :                                         if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1155           0 :                                                 bat_destroy(nb);
    1156           0 :                                                 bat_destroy(u);
    1157           0 :                                                 return NULL;
    1158             :                                         }
    1159           0 :                                         b = nb;
    1160           0 :                                         if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1161           0 :                                                 bat_destroy(ui);
    1162           0 :                                                 bat_destroy(uv);
    1163           0 :                                                 bat_destroy(b);
    1164           0 :                                                 bat_destroy(u);
    1165             :                                         }
    1166           0 :                                         bat_destroy(ui);
    1167           0 :                                         bat_destroy(uv);
    1168             :                                 }
    1169           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1170           0 :                                 bat_destroy(b);
    1171           0 :                                 assert(newoffsets == NULL);
    1172           0 :                                 if (!n) {
    1173           0 :                                         bat_destroy(u);
    1174           0 :                                         return NULL;
    1175             :                                 }
    1176           0 :                                 if (cs->ts != tr->tid) {
    1177           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1178           0 :                                                 bat_destroy(n);
    1179           0 :                                                 return NULL;
    1180             :                                         }
    1181           0 :                                         cs = &(*batp)->cs;
    1182           0 :                                         new = 1;
    1183             :                                 }
    1184           0 :                                 if (cs->bid && !new)
    1185           0 :                                         temp_destroy(cs->bid);
    1186           0 :                                 n = transfer_to_systrans(n);
    1187           0 :                                 if (n == NULL)
    1188             :                                         return NULL;
    1189           0 :                                 bat_set_access(n, BAT_READ);
    1190           0 :                                 cs->bid = temp_create(n);
    1191           0 :                                 bat_destroy(n);
    1192           0 :                                 if (cs->ebid && !new)
    1193           0 :                                         temp_destroy(cs->ebid);
    1194           0 :                                 cs->ebid = 0;
    1195           0 :                                 cs->ucnt = 0;
    1196           0 :                                 if (cs->uibid && !new)
    1197           0 :                                         temp_destroy(cs->uibid);
    1198           0 :                                 if (cs->uvbid && !new)
    1199           0 :                                         temp_destroy(cs->uvbid);
    1200           0 :                                 cs->uibid = cs->uvbid = 0;
    1201           0 :                                 cs->st = ST_DEFAULT;
    1202           0 :                                 cs->cleared = true;
    1203             :                         } else {
    1204           1 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1205           0 :                                         bat_destroy(newoffsets);
    1206           0 :                                         bat_destroy(u);
    1207           0 :                                         return NULL;
    1208             :                                 }
    1209           1 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), PERSISTENT);
    1210           1 :                                 bat_destroy(b);
    1211           1 :                                 if (!n) {
    1212           0 :                                         bat_destroy(newoffsets);
    1213           0 :                                         bat_destroy(u);
    1214           0 :                                         return NULL;
    1215             :                                 }
    1216           1 :                                 if (cs->ts != tr->tid) {
    1217           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1218           0 :                                                 bat_destroy(n);
    1219           0 :                                                 return NULL;
    1220             :                                         }
    1221           0 :                                         cs = &(*batp)->cs;
    1222           0 :                                         new = 1;
    1223           0 :                                         temp_dup(cs->ebid);
    1224           0 :                                         if (cs->uibid) {
    1225           0 :                                                 temp_dup(cs->uibid);
    1226           0 :                                                 temp_dup(cs->uvbid);
    1227             :                                         }
    1228             :                                 }
    1229           1 :                                 if (cs->bid && !new)
    1230           1 :                                         temp_destroy(cs->bid);
    1231           1 :                                 n = transfer_to_systrans(n);
    1232           1 :                                 if (n == NULL)
    1233             :                                         return NULL;
    1234           1 :                                 bat_set_access(n, BAT_READ);
    1235           1 :                                 cs->bid = temp_create(n);
    1236           1 :                                 bat_destroy(n);
    1237           1 :                                 cs->cleared = true;
    1238           1 :                                 i = newoffsets;
    1239             :                         }
    1240             :                 } else { /* append */
    1241          16 :                         i = newoffsets;
    1242             :                 }
    1243             :         }
    1244          17 :         bat_destroy(u);
    1245          17 :         return i;
    1246             : }
    1247             : 
    1248             : static BAT *
    1249           0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
    1250             : {
    1251           0 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1252           0 :         BAT *newoffsets = NULL;
    1253           0 :         BAT *b = NULL, *n = NULL;
    1254             : 
    1255           0 :         if (!(b = temp_descriptor(cs->bid)))
    1256             :                 return NULL;
    1257             : 
    1258           0 :         if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
    1259           0 :                 bat_destroy(b);
    1260           0 :                 return NULL;
    1261             :         } else {
    1262             :                 /* returns new offset bat if values within min/max, else decompress */
    1263           0 :                 if (!newoffsets) { /* decompress */
    1264           0 :                         if (cs->ucnt) {
    1265           0 :                                 BAT *ui = NULL, *uv = NULL;
    1266           0 :                                 BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1267           0 :                                 bat_destroy(b);
    1268           0 :                                 if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1269           0 :                                         bat_destroy(nb);
    1270           0 :                                         return NULL;
    1271             :                                 }
    1272           0 :                                 b = nb;
    1273           0 :                                 if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1274           0 :                                         bat_destroy(ui);
    1275           0 :                                         bat_destroy(uv);
    1276           0 :                                         bat_destroy(b);
    1277             :                                 }
    1278           0 :                                 bat_destroy(ui);
    1279           0 :                                 bat_destroy(uv);
    1280             :                         }
    1281           0 :                         n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
    1282           0 :                         bat_destroy(b);
    1283           0 :                         if (!n)
    1284             :                                 return NULL;
    1285           0 :                         if (cs->bid)
    1286           0 :                                 temp_destroy(cs->bid);
    1287           0 :                         n = transfer_to_systrans(n);
    1288           0 :                         if (n == NULL)
    1289             :                                 return NULL;
    1290           0 :                         bat_set_access(n, BAT_READ);
    1291           0 :                         cs->bid = temp_create(n);
    1292           0 :                         cs->ucnt = 0;
    1293           0 :                         if (cs->uibid)
    1294           0 :                                 temp_destroy(cs->uibid);
    1295           0 :                         if (cs->uvbid)
    1296           0 :                                 temp_destroy(cs->uvbid);
    1297           0 :                         cs->uibid = cs->uvbid = 0;
    1298           0 :                         cs->st = ST_DEFAULT;
    1299           0 :                         cs->cleared = true;
    1300           0 :                         b = n;
    1301             :                 } else { /* append */
    1302             :                         i = newoffsets;
    1303             :                 }
    1304             :         }
    1305           0 :         bat_destroy(b);
    1306           0 :         return i;
    1307             : }
    1308             : 
    1309             : /*
    1310             :  * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
    1311             :  */
    1312             : static int
    1313        3064 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1314             : {
    1315        3064 :         int res = LOG_OK;
    1316        3064 :         sql_delta *bat = *batp;
    1317        3064 :         column_storage *cs = &bat->cs;
    1318        3064 :         BAT *otids = tids, *oupdates = updates;
    1319             : 
    1320        3064 :         if (!BATcount(tids))
    1321             :                 return LOG_OK;
    1322             : 
    1323        3064 :         if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
    1324           6 :                 tids = BATunmask(tids);
    1325           6 :                 if (!tids)
    1326             :                         return LOG_ERR;
    1327             :         }
    1328        3064 :         if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
    1329           0 :                 updates = BATunmask(updates);
    1330           0 :                 if (!updates) {
    1331           0 :                         if (otids != tids)
    1332           0 :                                 bat_destroy(tids);
    1333           0 :                         return LOG_ERR;
    1334             :                 }
    1335        3064 :         } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
    1336          43 :                 updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
    1337          43 :                 if (!updates) {
    1338           0 :                         if (otids != tids)
    1339           0 :                                 bat_destroy(tids);
    1340           0 :                         return LOG_ERR;
    1341             :                 }
    1342             :         }
    1343             : 
    1344        3064 :         if (cs->st == ST_DICT) {
    1345             :                 /* possibly a new array is returned */
    1346           4 :                 BAT *nupdates = dict_append_bat(tr, batp, updates);
    1347           4 :                 bat = *batp;
    1348           4 :                 cs = &bat->cs;
    1349           4 :                 if (oupdates != updates)
    1350           0 :                         bat_destroy(updates);
    1351           4 :                 updates = nupdates;
    1352           4 :                 if (!updates) {
    1353           0 :                         if (otids != tids)
    1354           0 :                                 bat_destroy(tids);
    1355           0 :                         return LOG_ERR;
    1356             :                 }
    1357             :         }
    1358             : 
    1359             :         /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1360             :         /* currently only one update delta is possible */
    1361        3064 :         lock_table(tr->store, t->base.id);
    1362        3064 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1363        3064 :         if (!is_new && !cs->cleared) {
    1364        2742 :                 if (!tids->tsorted /* make sure we have simple dense or oids */) {
    1365           6 :                         BAT *sorted, *order;
    1366           6 :                         if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
    1367           0 :                                 if (otids != tids)
    1368           0 :                                         bat_destroy(tids);
    1369           0 :                                 if (oupdates != updates)
    1370           0 :                                         bat_destroy(updates);
    1371           0 :                                 unlock_table(tr->store, t->base.id);
    1372           0 :                                 return LOG_ERR;
    1373             :                         }
    1374           6 :                         if (otids != tids)
    1375           0 :                                 bat_destroy(tids);
    1376           6 :                         tids = sorted;
    1377           6 :                         BAT *nupdates = BATproject(order, updates);
    1378           6 :                         bat_destroy(order);
    1379           6 :                         if (oupdates != updates)
    1380           0 :                                 bat_destroy(updates);
    1381           6 :                         updates = nupdates;
    1382           6 :                         if (!updates) {
    1383           0 :                                 bat_destroy(tids);
    1384           0 :                                 unlock_table(tr->store, t->base.id);
    1385           0 :                                 return LOG_ERR;
    1386             :                         }
    1387             :                 }
    1388        2742 :                 assert(tids->tsorted);
    1389        2742 :                 BAT *ui = NULL, *uv = NULL;
    1390             : 
    1391             :                 /* handle updates on just inserted bits */
    1392             :                 /* handle updates on updates (within one transaction) */
    1393        2742 :                 BATiter upi = bat_iterator(updates);
    1394        2742 :                 BUN cnt = 0, ucnt = BATcount(tids);
    1395        2742 :                 BAT *b, *ins = NULL;
    1396        2742 :                 int *msk = NULL;
    1397             : 
    1398        2742 :                 if((b = temp_descriptor(cs->bid)) == NULL)
    1399             :                         res = LOG_ERR;
    1400             : 
    1401        2742 :                 if (res == LOG_OK && BATtdense(tids)) {
    1402        2507 :                         oid start = tids->tseqbase, offset = start;
    1403        2507 :                         oid end = start + ucnt;
    1404             : 
    1405        8398 :                         for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
    1406        6525 :                                 if (seg->start <= start && seg->end > start) {
    1407             :                                         /* check for delete conflicts */
    1408        2507 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1409           0 :                                                 res = LOG_CONFLICT;
    1410           0 :                                                 continue;
    1411             :                                         }
    1412             : 
    1413             :                                         /* check for inplace updates */
    1414        2507 :                                         BUN lend = end < seg->end?end:seg->end;
    1415        2507 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1416         133 :                                                 if (!ins) {
    1417         133 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1418         133 :                                                         if (!ins)
    1419             :                                                                 res = LOG_ERR;
    1420             :                                                         else {
    1421         133 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1422         133 :                                                                 msk = (int*)Tloc(ins, 0);
    1423         133 :                                                                 BUN end = (ucnt+31)/32;
    1424         133 :                                                                 memset(msk, 0, end * sizeof(int));
    1425             :                                                         }
    1426             :                                                 }
    1427         726 :                                                 for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
    1428         593 :                                                         const void *upd = BUNtail(upi, rid-offset);
    1429         593 :                                                         if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1430           0 :                                                                 res = LOG_ERR;
    1431             : 
    1432         593 :                                                         oid word = i/32;
    1433         593 :                                                         int pos = i%32;
    1434         593 :                                                         msk[word] |= 1U<<pos;
    1435         593 :                                                         cnt++;
    1436             :                                                 }
    1437             :                                         }
    1438             :                                 }
    1439        6525 :                                 if (end < seg->end)
    1440             :                                         break;
    1441             :                         }
    1442         238 :                 } else if (res == LOG_OK && complex_cand(tids)) {
    1443           3 :                         struct canditer ci;
    1444           3 :                         segment *seg = s->segs->h;
    1445           3 :                         canditer_init(&ci, NULL, tids);
    1446           3 :                         BUN i = 0;
    1447        1036 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1448        1033 :                                 oid rid = canditer_next(&ci);
    1449        1033 :                                 if (seg->end <= rid)
    1450          13 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1451        1020 :                                 else if (seg->start <= rid && seg->end > rid) {
    1452             :                                         /* check for delete conflicts */
    1453        1020 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1454           0 :                                                 res = LOG_CONFLICT;
    1455           0 :                                                 continue;
    1456             :                                         }
    1457             : 
    1458             :                                         /* check for inplace updates */
    1459        1020 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1460           0 :                                                 if (!ins) {
    1461           0 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1462           0 :                                                         if (!ins) {
    1463             :                                                                 res = LOG_ERR;
    1464             :                                                                 break;
    1465             :                                                         } else {
    1466           0 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1467           0 :                                                                 msk = (int*)Tloc(ins, 0);
    1468           0 :                                                                 BUN end = (ucnt+31)/32;
    1469           0 :                                                                 memset(msk, 0, end * sizeof(int));
    1470             :                                                         }
    1471             :                                                 }
    1472           0 :                                                 ptr upd = BUNtail(upi, i);
    1473           0 :                                                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1474           0 :                                                         res = LOG_ERR;
    1475             : 
    1476           0 :                                                 oid word = i/32;
    1477           0 :                                                 int pos = i%32;
    1478           0 :                                                 msk[word] |= 1U<<pos;
    1479           0 :                                                 cnt++;
    1480             :                                         }
    1481        1020 :                                         i++;
    1482             :                                 }
    1483             :                         }
    1484         232 :                 } else if (res == LOG_OK) {
    1485         232 :                         BUN i = 0;
    1486         232 :                         oid *rid = Tloc(tids,0);
    1487         232 :                         segment *seg = s->segs->h;
    1488       28097 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1489       27865 :                                 if (seg->end <= rid[i])
    1490        8113 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1491       19752 :                                 else if (seg->start <= rid[i] && seg->end > rid[i]) {
    1492             :                                         /* check for delete conflicts */
    1493       19752 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1494           0 :                                                 res = LOG_CONFLICT;
    1495           0 :                                                 continue;
    1496             :                                         }
    1497             : 
    1498             :                                         /* check for inplace updates */
    1499       19752 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1500         417 :                                                 if (!ins) {
    1501          47 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1502          47 :                                                         if (!ins) {
    1503             :                                                                 res = LOG_ERR;
    1504             :                                                                 break;
    1505             :                                                         } else {
    1506          47 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1507          47 :                                                                 msk = (int*)Tloc(ins, 0);
    1508          47 :                                                                 BUN end = (ucnt+31)/32;
    1509          47 :                                                                 memset(msk, 0, end * sizeof(int));
    1510             :                                                         }
    1511             :                                                 }
    1512         417 :                                                 const void *upd = BUNtail(upi, i);
    1513         417 :                                                 if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
    1514           0 :                                                         res = LOG_ERR;
    1515             : 
    1516         417 :                                                 oid word = i/32;
    1517         417 :                                                 int pos = i%32;
    1518         417 :                                                 msk[word] |= 1U<<pos;
    1519         417 :                                                 cnt++;
    1520             :                                         }
    1521       19752 :                                         i++;
    1522             :                                 }
    1523             :                         }
    1524             :                 }
    1525             : 
    1526        2742 :                 if (res == LOG_OK && cnt < ucnt) {   /* now handle real updates */
    1527        2562 :                         if (cs->ucnt == 0) {
    1528        2456 :                                 if (cnt) {
    1529           0 :                                         BAT *nins = BATmaskedcands(0, ucnt, ins, false);
    1530           0 :                                         if (nins) {
    1531           0 :                                                 ui = BATproject(nins, tids);
    1532           0 :                                                 uv = BATproject(nins, updates);
    1533           0 :                                                 bat_destroy(nins);
    1534             :                                         }
    1535             :                                 } else {
    1536        2456 :                                         ui = temp_descriptor(tids->batCacheid);
    1537        2456 :                                         uv = temp_descriptor(updates->batCacheid);
    1538             :                                 }
    1539        2456 :                                 if (!ui || !uv) {
    1540             :                                         res = LOG_ERR;
    1541             :                                 } else {
    1542        2456 :                                         temp_destroy(cs->uibid);
    1543        2456 :                                         temp_destroy(cs->uvbid);
    1544        2456 :                                         ui = transfer_to_systrans(ui);
    1545        2456 :                                         uv = transfer_to_systrans(uv);
    1546        2456 :                                         if (ui == NULL || uv == NULL) {
    1547           0 :                                                 BBPreclaim(ui);
    1548           0 :                                                 BBPreclaim(uv);
    1549             :                                                 res = LOG_ERR;
    1550             :                                         } else {
    1551        2456 :                                                 cs->uibid = temp_create(ui);
    1552        2456 :                                                 cs->uvbid = temp_create(uv);
    1553        2456 :                                                 cs->ucnt = BATcount(ui);
    1554             :                                         }
    1555             :                                 }
    1556             :                         } else {
    1557         106 :                                 BAT *nui = NULL, *nuv = NULL;
    1558             : 
    1559             :                                 /* merge taking msk of inserted into account */
    1560         106 :                                 if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
    1561             :                                         res = LOG_ERR;
    1562             : 
    1563         106 :                                 if (res == LOG_OK) {
    1564         106 :                                         const void *upd = NULL;
    1565         106 :                                         nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
    1566         106 :                                         nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
    1567             : 
    1568         106 :                                         if (!nui || !nuv) {
    1569             :                                                 res = LOG_ERR;
    1570             :                                         } else {
    1571         106 :                                                 BATiter ovi = bat_iterator(uv);
    1572             : 
    1573             :                                                 /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
    1574         106 :                                                 BUN uip = 0, uie = BATcount(ui);
    1575         106 :                                                 BUN nip = 0, nie = BATcount(tids);
    1576         106 :                                                 oid uiseqb = ui->tseqbase;
    1577         106 :                                                 oid niseqb = tids->tseqbase;
    1578         106 :                                                 oid *uipt = NULL, *nipt = NULL;
    1579         106 :                                                 BATiter uii = bat_iterator(ui);
    1580         106 :                                                 BATiter tidsi = bat_iterator(tids);
    1581         106 :                                                 if (!BATtdensebi(&uii))
    1582         105 :                                                         uipt = uii.base;
    1583         106 :                                                 if (!BATtdensebi(&tidsi))
    1584         105 :                                                         nipt = tidsi.base;
    1585       26789 :                                                 while (uip < uie && nip < nie && res == LOG_OK) {
    1586       26683 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1587       26683 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1588             : 
    1589       26683 :                                                         if (uiv < niv) {
    1590       15068 :                                                                 upd = BUNtail(ovi, uip);
    1591       30136 :                                                                 if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1592       15068 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1593             :                                                                         res = LOG_ERR;
    1594       15068 :                                                                 uip++;
    1595       11615 :                                                         } else if (uiv == niv) {
    1596             :                                                                 /* handle == */
    1597          18 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1598          18 :                                                                         upd = BUNtail(upi, nip);
    1599          36 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1600          18 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1601             :                                                                                 res = LOG_ERR;
    1602             :                                                                 } else {
    1603           0 :                                                                         upd = BUNtail(ovi, uip);
    1604           0 :                                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1605           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1606             :                                                                                 res = LOG_ERR;
    1607             :                                                                 }
    1608          18 :                                                                 uip++;
    1609          18 :                                                                 nip++;
    1610             :                                                         } else { /* uiv > niv */
    1611       11597 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1612       11597 :                                                                         upd = BUNtail(upi, nip);
    1613       23194 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1614       11597 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1615             :                                                                                 res = LOG_ERR;
    1616             :                                                                 }
    1617       11597 :                                                                 nip++;
    1618             :                                                         }
    1619             :                                                 }
    1620         542 :                                                 while (uip < uie && res == LOG_OK) {
    1621         436 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1622         436 :                                                         upd = BUNtail(ovi, uip);
    1623         872 :                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1624         436 :                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1625             :                                                                 res = LOG_ERR;
    1626         436 :                                                         uip++;
    1627             :                                                 }
    1628         799 :                                                 while (nip < nie && res == LOG_OK) {
    1629         693 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1630         693 :                                                         if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1631         693 :                                                                 upd = BUNtail(upi, nip);
    1632        1386 :                                                                 if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1633         693 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1634             :                                                                         res = LOG_ERR;
    1635             :                                                         }
    1636         693 :                                                         nip++;
    1637             :                                                 }
    1638         106 :                                                 bat_iterator_end(&uii);
    1639         106 :                                                 bat_iterator_end(&tidsi);
    1640         106 :                                                 bat_iterator_end(&ovi);
    1641         106 :                                                 if (res == LOG_OK) {
    1642         106 :                                                         temp_destroy(cs->uibid);
    1643         106 :                                                         temp_destroy(cs->uvbid);
    1644         106 :                                                         nui = transfer_to_systrans(nui);
    1645         106 :                                                         nuv = transfer_to_systrans(nuv);
    1646         106 :                                                         if (nui == NULL || nuv == NULL) {
    1647             :                                                                 res = LOG_ERR;
    1648             :                                                         } else {
    1649         106 :                                                                 cs->uibid = temp_create(nui);
    1650         106 :                                                                 cs->uvbid = temp_create(nuv);
    1651         106 :                                                                 cs->ucnt = BATcount(nui);
    1652             :                                                         }
    1653             :                                                 }
    1654             :                                         }
    1655         106 :                                         bat_destroy(nui);
    1656         106 :                                         bat_destroy(nuv);
    1657             :                                 }
    1658             :                         }
    1659             :                 }
    1660        2742 :                 bat_iterator_end(&upi);
    1661        2742 :                 bat_destroy(b);
    1662        2742 :                 unlock_table(tr->store, t->base.id);
    1663        2742 :                 bat_destroy(ins);
    1664        2742 :                 bat_destroy(ui);
    1665        2742 :                 bat_destroy(uv);
    1666        2742 :                 if (otids != tids)
    1667          10 :                         bat_destroy(tids);
    1668        2742 :                 if (oupdates != updates)
    1669          12 :                         bat_destroy(updates);
    1670        2742 :                 return res;
    1671             :         } else if (is_new || cs->cleared) {
    1672         322 :                 BAT *b = temp_descriptor(cs->bid);
    1673             : 
    1674         322 :                 if (b == NULL) {
    1675             :                         res = LOG_ERR;
    1676             :                 } else {
    1677         322 :                         if (BATcount(b)==0) {
    1678           1 :                                 if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
    1679           0 :                                         res = LOG_ERR;
    1680         321 :                         } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
    1681           0 :                                 res = LOG_ERR;
    1682         322 :                         BBPcold(b->batCacheid);
    1683         322 :                         bat_destroy(b);
    1684             :                 }
    1685             :         }
    1686         322 :         unlock_table(tr->store, t->base.id);
    1687         322 :         if (otids != tids)
    1688           2 :                 bat_destroy(tids);
    1689         322 :         if (oupdates != updates)
    1690          41 :                 bat_destroy(updates);
    1691             :         return res;
    1692             : }
    1693             : 
    1694             : static int
    1695        3064 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1696             : {
    1697        3064 :         return cs_update_bat(tr, bat, t, tids, updates, is_new);
    1698             : }
    1699             : 
    1700             : static void *
    1701           4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
    1702             : {
    1703           4 :         void *newoffsets = NULL;
    1704           4 :         sql_delta *bat = *batp;
    1705           4 :         column_storage *cs = &bat->cs;
    1706           4 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1707             : 
    1708           4 :         if (!u)
    1709             :                 return NULL;
    1710           4 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1711           4 :         if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
    1712           0 :                 bat_destroy(u);
    1713           0 :                 return NULL;
    1714             :         } else {
    1715           4 :                 int new = 0;
    1716             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1717           4 :                 if (BATcount(u) >= max_cnt) {
    1718           0 :                         if (max_cnt == 64*1024) { /* decompress */
    1719           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1720           0 :                                         bat_destroy(u);
    1721           0 :                                         return NULL;
    1722             :                                 }
    1723           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1724             :                                 /* TODO decompress updates if any */
    1725           0 :                                 bat_destroy(b);
    1726           0 :                                 assert(newoffsets == NULL);
    1727           0 :                                 if (!n) {
    1728           0 :                                         bat_destroy(u);
    1729           0 :                                         return NULL;
    1730             :                                 }
    1731           0 :                                 if (cs->ts != tr->tid) {
    1732           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1733           0 :                                                 bat_destroy(n);
    1734           0 :                                                 bat_destroy(u);
    1735           0 :                                                 return NULL;
    1736             :                                         }
    1737           0 :                                         cs = &(*batp)->cs;
    1738           0 :                                         new = 1;
    1739           0 :                                         cs->uibid = cs->uvbid = 0;
    1740             :                                 }
    1741           0 :                                 if (cs->bid && !new)
    1742           0 :                                         temp_destroy(cs->bid);
    1743           0 :                                 n = transfer_to_systrans(n);
    1744           0 :                                 if (n == NULL) {
    1745           0 :                                         bat_destroy(u);
    1746           0 :                                         return NULL;
    1747             :                                 }
    1748           0 :                                 bat_set_access(n, BAT_READ);
    1749           0 :                                 cs->bid = temp_create(n);
    1750           0 :                                 bat_destroy(n);
    1751           0 :                                 if (cs->ebid && !new)
    1752           0 :                                         temp_destroy(cs->ebid);
    1753           0 :                                 cs->ebid = 0;
    1754           0 :                                 cs->st = ST_DEFAULT;
    1755             :                                 /* at append_col the column's storage type is cleared */
    1756           0 :                                 cs->cleared = true;
    1757             :                         } else {
    1758           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1759           0 :                                         GDKfree(newoffsets);
    1760           0 :                                         bat_destroy(u);
    1761           0 :                                         return NULL;
    1762             :                                 }
    1763           0 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, PERSISTENT);
    1764           0 :                                 bat_destroy(b);
    1765           0 :                                 if (!n) {
    1766           0 :                                         GDKfree(newoffsets);
    1767           0 :                                         bat_destroy(u);
    1768           0 :                                         return NULL;
    1769             :                                 }
    1770           0 :                                 if (cs->ts != tr->tid) {
    1771           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1772           0 :                                                 bat_destroy(u);
    1773           0 :                                                 bat_destroy(n);
    1774           0 :                                                 return NULL;
    1775             :                                         }
    1776           0 :                                         cs = &(*batp)->cs;
    1777           0 :                                         new = 1;
    1778           0 :                                         temp_dup(cs->ebid);
    1779           0 :                                         if (cs->uibid) {
    1780           0 :                                                 temp_dup(cs->uibid);
    1781           0 :                                                 temp_dup(cs->uvbid);
    1782             :                                         }
    1783             :                                 }
    1784           0 :                                 if (cs->bid)
    1785           0 :                                         temp_destroy(cs->bid);
    1786           0 :                                 n = transfer_to_systrans(n);
    1787           0 :                                 if (n == NULL) {
    1788           0 :                                         bat_destroy(u);
    1789           0 :                                         return NULL;
    1790             :                                 }
    1791           0 :                                 bat_set_access(n, BAT_READ);
    1792           0 :                                 cs->bid = temp_create(n);
    1793           0 :                                 bat_destroy(n);
    1794           0 :                                 cs->cleared = true;
    1795           0 :                                 i = newoffsets;
    1796             :                         }
    1797             :                 } else { /* append */
    1798           4 :                         i = newoffsets;
    1799             :                 }
    1800             :         }
    1801           4 :         bat_destroy(u);
    1802           4 :         return i;
    1803             : }
    1804             : 
    1805             : static void *
    1806           1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
    1807             : {
    1808           1 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1809           1 :         void *newoffsets = NULL;
    1810           1 :         BAT *b = NULL, *n = NULL;
    1811             : 
    1812           1 :         if (!(b = temp_descriptor(cs->bid)))
    1813             :                 return NULL;
    1814             : 
    1815           1 :         if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
    1816           0 :                 bat_destroy(b);
    1817           0 :                 return NULL;
    1818             :         } else {
    1819             :                 /* returns new offset bat if values within min/max, else decompress */
    1820           1 :                 if (!newoffsets) {
    1821           1 :                         n = FORdecompress_(b, offsetval, tt, PERSISTENT);
    1822           1 :                         bat_destroy(b);
    1823           1 :                         if (!n)
    1824             :                                 return NULL;
    1825             :                         /* TODO decompress updates if any */
    1826           1 :                         if (cs->bid)
    1827           1 :                                 temp_destroy(cs->bid);
    1828           1 :                         n = transfer_to_systrans(n);
    1829           1 :                         if (n == NULL)
    1830             :                                 return NULL;
    1831           1 :                         bat_set_access(n, BAT_READ);
    1832           1 :                         cs->bid = temp_create(n);
    1833           1 :                         cs->st = ST_DEFAULT;
    1834             :                         /* at append_col the column's storage type is cleared */
    1835           1 :                         cs->cleared = true;
    1836           1 :                         b = n;
    1837             :                 } else { /* append */
    1838             :                         i = newoffsets;
    1839             :                 }
    1840             :         }
    1841           1 :         bat_destroy(b);
    1842           1 :         return i;
    1843             : }
    1844             : 
    1845             : static int
    1846        6454 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
    1847             : {
    1848        6454 :         void *oupd = upd;
    1849        6454 :         sql_delta *bat = *batp;
    1850        6454 :         column_storage *cs = &bat->cs;
    1851        6454 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1852        6454 :         assert(!is_oid_nil(rid));
    1853        6454 :         int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
    1854             : 
    1855        6454 :         if (cs->st == ST_DICT) {
    1856             :                 /* possibly a new array is returned */
    1857           0 :                 upd = dict_append_val(tr, batp, upd, 1);
    1858           0 :                 bat = *batp;
    1859           0 :                 cs = &bat->cs;
    1860           0 :                 if (!upd)
    1861             :                         return LOG_ERR;
    1862             :         }
    1863             : 
    1864             :         /* check if rid is insert ? */
    1865        6454 :         if (!inplace) {
    1866             :                 /* check conflict */
    1867        3829 :                 if (segments_is_deleted(s->segs->h, tr, rid)) {
    1868           0 :                         if (oupd != upd)
    1869           0 :                                 GDKfree(upd);
    1870           0 :                         return LOG_CONFLICT;
    1871             :                 }
    1872        3829 :                 BAT *ui, *uv;
    1873             : 
    1874             :                 /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1875             :                 /* currently only one update delta is possible */
    1876        3829 :                 if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1877           0 :                         if (oupd != upd)
    1878           0 :                                 GDKfree(upd);
    1879           0 :                         return LOG_ERR;
    1880             :                 }
    1881             : 
    1882        3829 :                 assert(uv->ttype);
    1883        3829 :                 assert(BATcount(ui) == BATcount(uv));
    1884        7658 :                 if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
    1885        3829 :                     BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
    1886           0 :                         if (oupd != upd)
    1887           0 :                                 GDKfree(upd);
    1888           0 :                         bat_destroy(ui);
    1889           0 :                         bat_destroy(uv);
    1890           0 :                         return LOG_ERR;
    1891             :                 }
    1892        3829 :                 assert(BATcount(ui) == BATcount(uv));
    1893        3829 :                 bat_destroy(ui);
    1894        3829 :                 bat_destroy(uv);
    1895        3829 :                 cs->ucnt++;
    1896             :         } else {
    1897        2625 :                 BAT *b = NULL;
    1898             : 
    1899        2625 :                 if((b = temp_descriptor(cs->bid)) == NULL) {
    1900           0 :                         if (oupd != upd)
    1901           0 :                                 GDKfree(upd);
    1902           0 :                         return LOG_ERR;
    1903             :                 }
    1904        2625 :                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
    1905           0 :                         if (oupd != upd)
    1906           0 :                                 GDKfree(upd);
    1907           0 :                         bat_destroy(b);
    1908           0 :                         return LOG_ERR;
    1909             :                 }
    1910        2625 :                 bat_destroy(b);
    1911             :         }
    1912        6454 :         if (oupd != upd)
    1913           0 :                 GDKfree(upd);
    1914             :         return LOG_OK;
    1915             : }
    1916             : 
    1917             : static int
    1918        6454 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
    1919             : {
    1920        6454 :         int res = LOG_OK;
    1921        6454 :         lock_table(tr->store, t->base.id);
    1922        6454 :         res = cs_update_val(tr, bat, t, rid, upd, is_new);
    1923        6454 :         unlock_table(tr->store, t->base.id);
    1924        6454 :         return res;
    1925             : }
    1926             : 
    1927             : static int
    1928      158980 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
    1929             : {
    1930      158980 :         (void)tr;
    1931      158980 :         if (!ocs)
    1932             :                 return LOG_OK;
    1933      158980 :         cs->bid = ocs->bid;
    1934      158980 :         cs->ebid = ocs->ebid;
    1935      158980 :         cs->uibid = ocs->uibid;
    1936      158980 :         cs->uvbid = ocs->uvbid;
    1937      158980 :         cs->ucnt = ocs->ucnt;
    1938             : 
    1939      158980 :         if (temp) {
    1940       25983 :                 cs->bid = temp_copy(cs->bid, true, false);
    1941       25983 :                 if (cs->bid == BID_NIL)
    1942             :                         return LOG_ERR;
    1943             :         } else {
    1944      132997 :                 temp_dup(cs->bid);
    1945             :         }
    1946      158983 :         if (cs->ebid)
    1947           6 :                 temp_dup(cs->ebid);
    1948      158983 :         cs->ucnt = 0;
    1949      158983 :         cs->uibid = e_bat(TYPE_oid);
    1950      158987 :         cs->uvbid = e_bat(type);
    1951      158989 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1952             :                 return LOG_ERR;
    1953      158989 :         cs->st = ocs->st;
    1954      158989 :         return LOG_OK;
    1955             : }
    1956             : 
    1957             : static void
    1958      312620 : destroy_delta(sql_delta *b, bool recursive)
    1959             : {
    1960      312620 :         if (ATOMIC_DEC(&b->cs.refcnt) > 0)
    1961             :                 return;
    1962      293273 :         if (recursive && b->next)
    1963       71886 :                 destroy_delta(b->next, true);
    1964      293273 :         if (b->cs.uibid)
    1965       95439 :                 temp_destroy(b->cs.uibid);
    1966      293273 :         if (b->cs.uvbid)
    1967       95439 :                 temp_destroy(b->cs.uvbid);
    1968      293273 :         if (b->cs.bid)
    1969      293273 :                 temp_destroy(b->cs.bid);
    1970      293273 :         if (b->cs.ebid)
    1971          61 :                 temp_destroy(b->cs.ebid);
    1972      293273 :         b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
    1973      293273 :         _DELETE(b);
    1974             : }
    1975             : 
    1976             : static sql_delta *
    1977    15664512 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
    1978             : {
    1979    15664512 :         sql_delta *obat = ATOMIC_PTR_GET(&c->data);
    1980             : 
    1981    15664512 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    1982    15531522 :                 return obat;
    1983      132990 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    1984             :                 /* abort */
    1985          12 :                 if (update_conflict)
    1986           4 :                         *update_conflict = true;
    1987           8 :                 else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
    1988           8 :                         return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
    1989           4 :                 return NULL;
    1990             :         }
    1991      132978 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
    1992             :                 return NULL;
    1993      132976 :         sql_delta* bat = ZNEW(sql_delta);
    1994      132976 :         if (!bat)
    1995             :                 return NULL;
    1996      132976 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    1997      132976 :         if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
    1998           0 :                 destroy_delta(bat, false);
    1999           0 :                 return NULL;
    2000             :         }
    2001      132979 :         bat->cs.ts = tr->tid;
    2002             :         /* only one writer else abort */
    2003      132979 :         bat->next = obat;
    2004      132979 :         if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
    2005           0 :                 bat->next = NULL;
    2006           0 :                 destroy_delta(bat, false);
    2007           0 :                 if (update_conflict)
    2008           0 :                         *update_conflict = true;
    2009           0 :                 return NULL;
    2010             :         }
    2011             :         return bat;
    2012             : }
    2013             : 
    2014             : static int
    2015        9518 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
    2016             : {
    2017        9518 :         int ok = LOG_OK;
    2018             : 
    2019        9518 :         if (is_bat) {
    2020        3064 :                 BAT *tids = incoming_tids;
    2021        3064 :                 BAT *values = incoming_values;
    2022        3064 :                 if (BATcount(tids) == 0)
    2023             :                         return LOG_OK;
    2024        3064 :                 ok = delta_update_bat(tr, delta, table, tids, values, is_new);
    2025             :         } else {
    2026        6454 :                 ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
    2027             :         }
    2028             :         return ok;
    2029             : }
    2030             : 
    2031             : static int
    2032        9766 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
    2033             : {
    2034        9766 :         int res = LOG_OK;
    2035        9766 :         bool update_conflict = false;
    2036        9766 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2037             : 
    2038        9766 :         if (isbat) {
    2039        3309 :                 BAT *t = tids;
    2040        3309 :                 if (!BATcount(t))
    2041             :                         return LOG_OK;
    2042             :         }
    2043             : 
    2044        9243 :         if (c == NULL)
    2045             :                 return LOG_ERR;
    2046             : 
    2047        9243 :         if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
    2048           4 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2049             : 
    2050        9239 :         assert(delta && delta->cs.ts == tr->tid);
    2051        9239 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    2052        9239 :         if (odelta != delta)
    2053        3449 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    2054             : 
    2055        9239 :         odelta = delta;
    2056        9239 :         if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
    2057             :                 return res;
    2058        9239 :         assert(delta == odelta);
    2059        9239 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2060           0 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2061             :         return res;
    2062             : }
    2063             : 
    2064             : static sql_delta *
    2065        2457 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
    2066             : {
    2067        2457 :         sql_delta *obat = ATOMIC_PTR_GET(&i->data);
    2068             : 
    2069        2457 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    2070        2429 :                 return obat;
    2071          28 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    2072             :                 /* abort */
    2073           0 :                 if (update_conflict)
    2074           0 :                         *update_conflict = true;
    2075           0 :                 return NULL;
    2076             :         }
    2077          28 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
    2078             :                 return NULL;
    2079          28 :         sql_delta* bat = ZNEW(sql_delta);
    2080          28 :         if (!bat)
    2081             :                 return NULL;
    2082          28 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2083          33 :         if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
    2084           0 :                 destroy_delta(bat, false);
    2085           0 :                 return NULL;
    2086             :         }
    2087          28 :         bat->cs.ts = tr->tid;
    2088             :         /* only one writer else abort */
    2089          28 :         bat->next = obat;
    2090          28 :         if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
    2091           0 :                 bat->next = NULL;
    2092           0 :                 destroy_delta(bat, false);
    2093           0 :                 if (update_conflict)
    2094           0 :                         *update_conflict = true;
    2095           0 :                 return NULL;
    2096             :         }
    2097             :         return bat;
    2098             : }
    2099             : 
    2100             : static int
    2101         782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
    2102             : {
    2103         782 :         int res = LOG_OK;
    2104         782 :         bool update_conflict = false;
    2105         782 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    2106             : 
    2107         782 :         if (isbat) {
    2108         782 :                 BAT *t = tids;
    2109         782 :                 if (!BATcount(t))
    2110             :                         return LOG_OK;
    2111             :         }
    2112             : 
    2113         279 :         if (i == NULL)
    2114             :                 return LOG_ERR;
    2115             : 
    2116         279 :         if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
    2117           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2118             : 
    2119         279 :         assert(delta && delta->cs.ts == tr->tid);
    2120         279 :         if (odelta != delta)
    2121          22 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    2122             : 
    2123         279 :         odelta = delta;
    2124         279 :         res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
    2125         279 :         assert(delta == odelta);
    2126             :         return res;
    2127             : }
    2128             : 
    2129             : static int
    2130      150147 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type, bool istemp)
    2131             : {
    2132      150147 :         BAT *b, *oi = i;
    2133      150147 :         int err = 0;
    2134      150147 :         sql_delta *bat = *batp;
    2135             : 
    2136      150147 :         assert(!offsets || BATcount(offsets) == BATcount(i));
    2137      150147 :         if (!BATcount(i))
    2138             :                 return LOG_OK;
    2139      150147 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
    2140             :                 return LOG_ERR;
    2141             : 
    2142      150147 :         lock_column(tr->store, id);
    2143      150139 :         if (bat->cs.st == ST_DICT) {
    2144          13 :                 BAT *ni = dict_append_bat(tr, batp, oi);
    2145          13 :                 bat = *batp;
    2146          13 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2147           0 :                         bat_destroy(oi);
    2148          13 :                 oi = ni;
    2149          13 :                 if (!oi) {
    2150           0 :                         unlock_column(tr->store, id);
    2151           0 :                         return LOG_ERR;
    2152             :                 }
    2153             :         }
    2154      150139 :         if (bat->cs.st == ST_FOR) {
    2155           0 :                 BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
    2156           0 :                 bat = *batp;
    2157           0 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2158           0 :                         bat_destroy(oi);
    2159           0 :                 oi = ni;
    2160           0 :                 if (!oi) {
    2161           0 :                         unlock_column(tr->store, id);
    2162           0 :                         return LOG_ERR;
    2163             :                 }
    2164             :         }
    2165             : 
    2166      150139 :         b = temp_descriptor(bat->cs.bid);
    2167      150110 :         if (b == NULL) {
    2168           0 :                 unlock_column(tr->store, id);
    2169           0 :                 if (oi != i)
    2170           0 :                         bat_destroy(oi);
    2171           0 :                 return LOG_ERR;
    2172             :         }
    2173      150110 :         if (istemp && !offsets && offset == 0 && BATcount(b) == 0 && bat->cs.ucnt == 0) {
    2174          14 :                 bat_set_access(i, BAT_READ);
    2175          14 :                 if (bat->cs.bid)
    2176          14 :                         temp_destroy(bat->cs.bid);
    2177          14 :                 i = transfer_to_systrans(i);
    2178          14 :                 bat->cs.bid = temp_create(i);
    2179      150096 :         } else if (!offsets && offset == b->hseqbase+BATcount(b)) {
    2180      149908 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2181           2 :                         err = 1;
    2182         176 :         } else if (!offsets) {
    2183         176 :                 if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
    2184           2 :                         err = 1;
    2185          12 :         } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
    2186           0 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2187           2 :                         err = 1;
    2188          12 :         } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
    2189           0 :                         err = 1;
    2190             :         }
    2191      150132 :         bat_destroy(b);
    2192      150148 :         unlock_column(tr->store, id);
    2193             : 
    2194      150138 :         if (oi != i)
    2195          13 :                 bat_destroy(oi);
    2196      150138 :         return (err)?LOG_ERR:LOG_OK;
    2197             : }
    2198             : 
    2199             : // Look at the offsets and find where the replacements end and the appends begin.
    2200             : static BUN
    2201           0 : start_of_appends(BAT *offsets, BUN bcnt)
    2202             : {
    2203           0 :         BUN ocnt = BATcount(offsets);
    2204           0 :         if (ocnt == 0)
    2205             :                 return 0;
    2206             : 
    2207           0 :         BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
    2208           0 :         if (highest < bcnt)
    2209             :                 // all are replacements
    2210             :                 return ocnt;
    2211             : 
    2212             :         // reason backward to find the first append.
    2213             :         // Suppose offsets has 15 entries, bcnt == 100
    2214             :         // and the highest offset in offsets is 109.
    2215           0 :         BUN new_bcnt = highest + 1; // 110
    2216           0 :         BUN nappends = new_bcnt - bcnt; // 10
    2217           0 :         BUN nreplacements = ocnt - nappends; // 5
    2218             : 
    2219             :         // The first append should be to position bcnt
    2220           0 :         assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
    2221             : 
    2222             :         return nreplacements;
    2223             : }
    2224             : 
    2225             : 
    2226             : static int
    2227    15377765 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
    2228             : {
    2229    15377765 :         void *oi = i;
    2230    15377765 :         BAT *b;
    2231    15377765 :         lock_column(tr->store, id);
    2232    15377556 :         sql_delta *bat = *batp;
    2233             : 
    2234    15377556 :         if (bat->cs.st == ST_DICT) {
    2235             :                 /* possibly a new array is returned */
    2236           4 :                 i = dict_append_val(tr, batp, i, cnt);
    2237           4 :                 bat = *batp;
    2238           4 :                 if (!i) {
    2239           0 :                         unlock_column(tr->store, id);
    2240           0 :                         return LOG_ERR;
    2241             :                 }
    2242             :         }
    2243    15377556 :         if (bat->cs.st == ST_FOR) {
    2244             :                 /* possibly a new array is returned */
    2245           1 :                 i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
    2246           1 :                 bat = *batp;
    2247           1 :                 if (!i) {
    2248           0 :                         unlock_column(tr->store, id);
    2249           0 :                         return LOG_ERR;
    2250             :                 }
    2251             :         }
    2252             : 
    2253    15377556 :         b = temp_descriptor(bat->cs.bid);
    2254    15377306 :         if (b == NULL) {
    2255           0 :                 if (i != oi)
    2256           0 :                         GDKfree(i);
    2257           0 :                 unlock_column(tr->store, id);
    2258           0 :                 return LOG_ERR;
    2259             :         }
    2260    15377306 :         BUN bcnt = BATcount(b);
    2261             : 
    2262    15377306 :         if (offsets) {
    2263             :                 // The first few might be replacements while later items might be appends.
    2264             :                 // Handle the replacements here while leaving the appends to the code below.
    2265           0 :                 BUN nreplacements = start_of_appends(offsets, bcnt);
    2266             : 
    2267           0 :                 oid *start = Tloc(offsets, 0);
    2268           0 :                 if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
    2269           0 :                         bat_destroy(b);
    2270           0 :                         if (i != oi)
    2271           0 :                                 GDKfree(i);
    2272           0 :                         unlock_column(tr->store, id);
    2273           0 :                         return LOG_ERR;
    2274             :                 }
    2275             : 
    2276             :                 // Replacements have been handled. The rest are appends.
    2277           0 :                 assert(offset == oid_nil);
    2278           0 :                 offset = bcnt;
    2279           0 :                 cnt -= nreplacements;
    2280             :         }
    2281             : 
    2282    15377306 :         if (bcnt > offset){
    2283      559082 :                 size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
    2284      559082 :                 if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
    2285           0 :                         bat_destroy(b);
    2286           0 :                         if (i != oi)
    2287           0 :                                 GDKfree(i);
    2288           0 :                         unlock_column(tr->store, id);
    2289           0 :                         return LOG_ERR;
    2290             :                 }
    2291      559290 :                 cnt -= ccnt;
    2292      559290 :                 offset += ccnt;
    2293             :         }
    2294    15377514 :         if (cnt) {
    2295    14818226 :                 if (BATcount(b) < offset) { /* add space */
    2296       10304 :                         BUN d = offset - BATcount(b);
    2297       10304 :                         if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
    2298           0 :                                 bat_destroy(b);
    2299           0 :                                 if (i != oi)
    2300           0 :                                         GDKfree(i);
    2301           0 :                                 unlock_column(tr->store, id);
    2302           0 :                                 return LOG_ERR;
    2303             :                         }
    2304             :                 }
    2305    14818226 :                 if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
    2306           0 :                         bat_destroy(b);
    2307           0 :                         if (i != oi)
    2308           0 :                                 GDKfree(i);
    2309           0 :                         unlock_column(tr->store, id);
    2310           0 :                         return LOG_ERR;
    2311             :                 }
    2312             :         }
    2313    15377711 :         bat_destroy(b);
    2314    15377581 :         if (i != oi)
    2315           4 :                 GDKfree(i);
    2316    15377581 :         unlock_column(tr->store, id);
    2317    15377581 :         return LOG_OK;
    2318             : }
    2319             : 
    2320             : static int
    2321       25983 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
    2322             : {
    2323       25983 :         if (!(bat->segs = new_segments(tr, 0)))
    2324             :                 return LOG_ERR;
    2325       25983 :         return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
    2326             : }
    2327             : 
    2328             : static int
    2329    15527931 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type, bool isnew)
    2330             : {
    2331    15527931 :         int ok = LOG_OK;
    2332             : 
    2333    15527931 :         if ((*delta)->cs.merged)
    2334       36521 :                 (*delta)->cs.merged = false; /* TODO needs to move */
    2335    15527931 :         if (isbat) {
    2336      150150 :                 BAT *bat = incoming_data;
    2337             : 
    2338      150150 :                 if (BATcount(bat))
    2339      150148 :                         ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type, isnew);
    2340             :         } else {
    2341    15377781 :                 ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
    2342             :         }
    2343    15527725 :         return ok;
    2344             : }
    2345             : 
    2346             : static int
    2347    15526520 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2348             : {
    2349    15526520 :         int res = LOG_OK;
    2350    15526520 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2351             : 
    2352    15526520 :         if (isbat) {
    2353      149944 :                 BAT *t = data;
    2354      149944 :                 if (!BATcount(t))
    2355             :                         return LOG_OK;
    2356             :         }
    2357             : 
    2358    15525725 :         if ((delta = bind_col_data(tr, c, NULL)) == NULL)
    2359             :                 return LOG_ERR;
    2360             : 
    2361    15525738 :         assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
    2362             : 
    2363    15525738 :         odelta = delta;
    2364    15525738 :         if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type, isTempTable(c->t))) != LOG_OK)
    2365             :                 return res;
    2366    15525539 :         if (odelta != delta) {
    2367           0 :                 delta->next = odelta;
    2368           0 :                 if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
    2369           0 :                         delta->next = NULL;
    2370           0 :                         destroy_delta(delta, false);
    2371           0 :                         return LOG_CONFLICT;
    2372             :                 }
    2373             :         }
    2374    15525539 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2375           1 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2376             :         return res;
    2377             : }
    2378             : 
    2379             : static int
    2380        2184 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2381             : {
    2382        2184 :         int res = LOG_OK;
    2383        2184 :         sql_delta *delta;
    2384             : 
    2385        2184 :         if (isbat) {
    2386        1010 :                 BAT *t = data;
    2387        1010 :                 if (!BATcount(t))
    2388             :                         return LOG_OK;
    2389             :         }
    2390             : 
    2391        2172 :         if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
    2392             :                 return LOG_ERR;
    2393             : 
    2394        2172 :         assert(delta->cs.st == ST_DEFAULT);
    2395             : 
    2396        2172 :         res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL, isTempTable(i->t));
    2397        2172 :         return res;
    2398             : }
    2399             : 
    2400             : static int
    2401       74772 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
    2402             : {
    2403       74772 :         int err = 0;
    2404             : 
    2405             :         /* TODO check for conflicting updates */
    2406       74772 :         (void)rid;
    2407       74772 :         (void)cnt;
    2408      555162 :         for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
    2409      480390 :                 sql_column *c = n->data;
    2410      480390 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    2411             : 
    2412             :                 /* check for active updates */
    2413      480390 :                 if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
    2414             :                         return 1;
    2415             :         }
    2416             :         return 0;
    2417             : }
    2418             : 
    2419             : static int
    2420       71843 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
    2421             : {
    2422       71843 :         int in_transaction = segments_in_transaction(tr, t);
    2423             : 
    2424       71843 :         lock_table(tr->store, t->base.id);
    2425             :         /* find segment of rid, split, mark new segment deleted (for tr->tid) */
    2426       71843 :         segment *seg = s->segs->h, *p = NULL;
    2427    51777800 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2428    51777800 :                 if (seg->start <= rid && seg->end > rid) {
    2429       71843 :                         if (!SEG_VALID_4_DELETE(seg,tr)) {
    2430           4 :                                 unlock_table(tr->store, t->base.id);
    2431           4 :                                 return LOG_CONFLICT;
    2432             :                         }
    2433       71839 :                         if (deletes_conflict_updates( tr, t, rid, 1)) {
    2434           0 :                                 unlock_table(tr->store, t->base.id);
    2435           0 :                                 return LOG_CONFLICT;
    2436             :                         }
    2437       71839 :                         if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
    2438           0 :                                 unlock_table(tr->store, t->base.id);
    2439           0 :                                 return LOG_ERR;
    2440             :                         }
    2441             :                         break;
    2442             :                 }
    2443             :         }
    2444       71839 :         unlock_table(tr->store, t->base.id);
    2445       71839 :         if (!in_transaction)
    2446       12604 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2447             :         return LOG_OK;
    2448             : }
    2449             : 
    2450             : static int
    2451        2931 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
    2452             : {
    2453        2931 :         segment *seg = *Seg, *p = NULL;
    2454        9672 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2455        9670 :                 if (seg->start <= start && seg->end > start) {
    2456        2981 :                         size_t lcnt = cnt;
    2457        2981 :                         if (start+lcnt > seg->end)
    2458          54 :                                 lcnt = seg->end-start;
    2459        2981 :                         if (SEG_IS_DELETED(seg, tr)) {
    2460          47 :                                 start += lcnt;
    2461          47 :                                 cnt -= lcnt;
    2462          47 :                                 continue;
    2463        2934 :                         } else if (!SEG_VALID_4_DELETE(seg, tr))
    2464           1 :                                 return LOG_CONFLICT;
    2465        2933 :                         if (deletes_conflict_updates( tr, t, start, lcnt))
    2466             :                                 return LOG_CONFLICT;
    2467        2933 :                         *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
    2468        2933 :                         if (!seg)
    2469             :                                 return LOG_ERR;
    2470        2933 :                         start += lcnt;
    2471        2933 :                         cnt -= lcnt;
    2472             :                 }
    2473        9622 :                 if (start+cnt <= seg->end)
    2474             :                         break;
    2475             :         }
    2476             :         return LOG_OK;
    2477             : }
    2478             : 
    2479             : static int
    2480         566 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
    2481             : {
    2482         566 :         segment *seg = s->segs->h;
    2483         566 :         return seg_delete_range(tr, t, s, &seg, start, cnt);
    2484             : }
    2485             : 
    2486             : static int
    2487         303 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
    2488             : {
    2489         303 :         int in_transaction = segments_in_transaction(tr, t);
    2490         303 :         BAT *oi = i;    /* update ids */
    2491         303 :         int ok = LOG_OK;
    2492             : 
    2493         303 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
    2494             :                 return LOG_ERR;
    2495         303 :         if (BATcount(i)) {
    2496         534 :                 if (BATtdense(i)) {
    2497         231 :                         size_t start = i->tseqbase;
    2498         231 :                         size_t cnt = BATcount(i);
    2499             : 
    2500         231 :                         lock_table(tr->store, t->base.id);
    2501         231 :                         ok = delete_range(tr, t, s, start, cnt);
    2502         231 :                         unlock_table(tr->store, t->base.id);
    2503          72 :                 } else if (complex_cand(i)) {
    2504           0 :                         struct canditer ci;
    2505           0 :                         oid f = 0, l = 0, cur = 0;
    2506             : 
    2507           0 :                         canditer_init(&ci, NULL, i);
    2508           0 :                         cur = f = canditer_next(&ci);
    2509             : 
    2510           0 :                         lock_table(tr->store, t->base.id);
    2511           0 :                         if (!is_oid_nil(f)) {
    2512           0 :                                 segment *seg = s->segs->h;
    2513           0 :                                 for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
    2514           0 :                                         if (cur+1 == l) {
    2515           0 :                                                 cur++;
    2516           0 :                                                 continue;
    2517             :                                         }
    2518           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2519           0 :                                         f = cur = l;
    2520             :                                 }
    2521           0 :                                 if (ok == LOG_OK)
    2522           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2523             :                         }
    2524           0 :                         unlock_table(tr->store, t->base.id);
    2525             :                 } else {
    2526          72 :                         if (!i->tsorted) {
    2527           0 :                                 assert(oi == i);
    2528           0 :                                 BAT *ni = NULL;
    2529           0 :                                 if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
    2530           0 :                                         ok = LOG_ERR;
    2531           0 :                                 if (ni)
    2532           0 :                                         i = ni;
    2533             :                         }
    2534          72 :                         assert(i->tsorted);
    2535          72 :                         BUN icnt = BATcount(i);
    2536          72 :                         BATiter ii = bat_iterator(i);
    2537          72 :                         oid *o = ii.base, n = o[0]+1;
    2538          72 :                         size_t lcnt = 1;
    2539             : 
    2540          72 :                         lock_table(tr->store, t->base.id);
    2541          72 :                         segment *seg = s->segs->h;
    2542       24025 :                         for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
    2543       23953 :                                 if (o[i] == n) {
    2544       23610 :                                         lcnt++;
    2545       23610 :                                         n++;
    2546             :                                 } else {
    2547         343 :                                         ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2548         343 :                                         lcnt = 0;
    2549             :                                 }
    2550       23953 :                                 if (!lcnt) {
    2551         343 :                                         n = o[i]+1;
    2552         343 :                                         lcnt = 1;
    2553             :                                 }
    2554             :                         }
    2555          72 :                         bat_iterator_end(&ii);
    2556          72 :                         if (lcnt && ok == LOG_OK)
    2557          72 :                                 ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2558          72 :                         unlock_table(tr->store, t->base.id);
    2559             :                 }
    2560             :         }
    2561         303 :         if (i != oi)
    2562          25 :                 bat_destroy(i);
    2563             :         // assert
    2564         303 :         if (!in_transaction)
    2565         271 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2566             :         return ok;
    2567             : }
    2568             : 
    2569             : static void
    2570       51321 : destroy_segments(segments *s)
    2571             : {
    2572       51321 :         if (!s || sql_ref_dec(&s->r) > 0)
    2573           0 :                 return;
    2574       51321 :         segment *seg = s->h;
    2575      110994 :         while(seg) {
    2576       59673 :                 segment *n = ATOMIC_PTR_GET(&seg->next);
    2577       59673 :                 ATOMIC_PTR_DESTROY(&seg->next);
    2578       59673 :                 _DELETE(seg);
    2579       59673 :                 seg = n;
    2580             :         }
    2581       51321 :         _DELETE(s);
    2582             : }
    2583             : 
    2584             : static void
    2585       52711 : destroy_storage(storage *bat)
    2586             : {
    2587       52711 :         if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
    2588             :                 return;
    2589       51180 :         if (bat->next)
    2590       14396 :                 destroy_storage(bat->next);
    2591       51180 :         destroy_segments(bat->segs);
    2592       51180 :         if (bat->cs.uibid)
    2593       30575 :                 temp_destroy(bat->cs.uibid);
    2594       51180 :         if (bat->cs.uvbid)
    2595       30575 :                 temp_destroy(bat->cs.uvbid);
    2596       51180 :         if (bat->cs.bid)
    2597       51180 :                 temp_destroy(bat->cs.bid);
    2598       51180 :         bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
    2599       51180 :         _DELETE(bat);
    2600             : }
    2601             : 
    2602             : static int
    2603      162112 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
    2604             : {
    2605      162112 :         if (uncommitted) {
    2606      433727 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2607      286686 :                         if (!VALID_4_READ(s->ts,tr))
    2608             :                                 return 1;
    2609             :         } else {
    2610      122720 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2611      108847 :                         if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
    2612             :                                 return 1;
    2613             :         }
    2614             : 
    2615             :         return 0;
    2616             : }
    2617             : 
    2618             : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
    2619             : 
    2620             : storage *
    2621     2181837 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
    2622             : {
    2623     2181837 :         storage *obat;
    2624             : 
    2625     2181837 :         obat = ATOMIC_PTR_GET(&t->data);
    2626             : 
    2627     2181837 :         if (obat->cs.ts != tr->tid)
    2628     1505767 :                 if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
    2629     1505712 :                         if (obat->cs.ts >= TRANSACTION_ID_BASE) {
    2630             :                                 /* abort */
    2631       15490 :                                 if (clear)
    2632       15490 :                                         *clear = true;
    2633       15490 :                                 return NULL;
    2634             :                         }
    2635             : 
    2636     2166347 :         if (!clear)
    2637             :                 return obat;
    2638             : 
    2639             :         /* remainder is only to handle clear */
    2640       26283 :         if (segments_conflict(tr, obat->segs, 1)) {
    2641         300 :                 *clear = true;
    2642         300 :                 return NULL;
    2643             :         }
    2644       25983 :         if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
    2645             :                 return NULL;
    2646       25983 :         storage *bat = ZNEW(storage);
    2647       25983 :         if (!bat)
    2648             :                 return NULL;
    2649       25983 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2650       25983 :         if (dup_storage(tr, obat, bat) != LOG_OK) {
    2651           0 :                 destroy_storage(bat);
    2652           0 :                 return NULL;
    2653             :         }
    2654       25983 :         bat->cs.cleared = true;
    2655       25983 :         bat->cs.ts = tr->tid;
    2656             :         /* only one writer else abort */
    2657       25983 :         bat->next = obat;
    2658       25983 :         if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
    2659           6 :                 bat->next = NULL;
    2660           6 :                 destroy_storage(bat);
    2661           6 :                 if (clear)
    2662           6 :                         *clear = true;
    2663           6 :                 return NULL;
    2664             :         }
    2665             :         return bat;
    2666             : }
    2667             : 
    2668             : static int
    2669       72195 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
    2670             : {
    2671       72195 :         int ok = LOG_OK;
    2672       72195 :         BAT *b = ib;
    2673       72195 :         storage *bat;
    2674             : 
    2675       72195 :         if (isbat && !BATcount(b))
    2676             :                 return ok;
    2677             : 
    2678       72146 :         if (t == NULL)
    2679             :                 return LOG_ERR;
    2680             : 
    2681       72146 :         if ((bat = bind_del_data(tr, t, NULL)) == NULL)
    2682             :                 return LOG_ERR;
    2683             : 
    2684       72146 :         if (isbat)
    2685         303 :                 ok = storage_delete_bat(tr, t, bat, ib);
    2686             :         else
    2687       71843 :                 ok = storage_delete_val(tr, t, bat, *(oid*)ib);
    2688             :         return ok;
    2689             : }
    2690             : 
    2691             : static size_t
    2692           0 : dcount_col(sql_trans *tr, sql_column *c)
    2693             : {
    2694           0 :         sql_delta *b;
    2695             : 
    2696           0 :         if (!isTable(c->t))
    2697             :                 return 0;
    2698           0 :         b = col_timestamp_delta(tr, c);
    2699           0 :         if (!b)
    2700             :                 return 1;
    2701             : 
    2702           0 :         storage *s = ATOMIC_PTR_GET(&c->t->data);
    2703           0 :         if (!s || !s->segs->t)
    2704             :                 return 1;
    2705           0 :         size_t cnt = s->segs->t->end;
    2706           0 :         if (cnt) {
    2707           0 :                 BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
    2708           0 :                 size_t dcnt = 0;
    2709             : 
    2710           0 :                 if (v)
    2711           0 :                         dcnt = BATguess_uniques(v, NULL);
    2712           0 :                 return dcnt;
    2713             :         }
    2714             :         return cnt;
    2715             : }
    2716             : 
    2717             : static BAT *
    2718     3749839 : bind_no_view(BAT *b, bool quick)
    2719             : {
    2720     3749839 :         if (isVIEW(b)) { /* If it is a view get the parent BAT */
    2721     3747910 :                 BAT *nb = BBP_desc(VIEWtparent(b));
    2722     3747910 :                 bat_destroy(b);
    2723     3747908 :                 if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
    2724             :                         return NULL;
    2725             :         }
    2726             :         return b;
    2727             : }
    2728             : 
    2729             : static int
    2730           0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
    2731             : {
    2732           0 :         int ok = 0;
    2733           0 :         assert(tr->active);
    2734           0 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2735           0 :                 return 0;
    2736           0 :         lock_column(tr->store, c->base.id);
    2737           0 :         if (unique_est) {
    2738           0 :                 sql_delta *d;
    2739           0 :                 if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
    2740           0 :                         BAT *b;
    2741           0 :                         if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
    2742           0 :                                 MT_lock_set(&b->theaplock);
    2743           0 :                                 b->tunique_est = *unique_est;
    2744           0 :                                 MT_lock_unset(&b->theaplock);
    2745           0 :                                 bat_destroy(b);
    2746             :                         }
    2747             :                 }
    2748             :         }
    2749           0 :         if (min) {
    2750           0 :                 _DELETE(c->min);
    2751           0 :                 size_t minlen = ATOMlen(c->type.type->localtype, min);
    2752           0 :                 if ((c->min = GDKmalloc(minlen)) != NULL) {
    2753           0 :                         memcpy(c->min, min, minlen);
    2754           0 :                         ok = 1;
    2755             :                 }
    2756             :         }
    2757           0 :         if (max) {
    2758           0 :                 _DELETE(c->max);
    2759           0 :                 size_t maxlen = ATOMlen(c->type.type->localtype, max);
    2760           0 :                 if ((c->max = GDKmalloc(maxlen)) != NULL) {
    2761           0 :                         memcpy(c->max, max, maxlen);
    2762           0 :                         ok = 1;
    2763             :                 }
    2764             :         }
    2765           0 :         unlock_column(tr->store, c->base.id);
    2766           0 :         return ok;
    2767             : }
    2768             : 
    2769             : static int
    2770          26 : min_max_col(sql_trans *tr, sql_column *c)
    2771             : {
    2772          26 :         int ok = 0;
    2773          26 :         BAT *b = NULL;
    2774          26 :         sql_delta *d = NULL;
    2775             : 
    2776          26 :         assert(tr->active);
    2777          26 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2778           0 :                 return 0;
    2779          26 :         if (c->min && c->max)
    2780             :                 return 1;
    2781          26 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2782          26 :                 if (d->cs.st == ST_FOR)
    2783             :                         return 0;
    2784          26 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2785          26 :                 lock_column(tr->store, c->base.id);
    2786          26 :                 if (c->min && c->max) {
    2787           0 :                         unlock_column(tr->store, c->base.id);
    2788           0 :                         return 1;
    2789             :                 }
    2790          26 :                 _DELETE(c->min);
    2791          26 :                 _DELETE(c->max);
    2792          26 :                 if ((b = bind_col(tr, c, access))) {
    2793          26 :                         if (!(b = bind_no_view(b, false))) {
    2794           0 :                                 unlock_column(tr->store, c->base.id);
    2795           0 :                                 return 0;
    2796             :                         }
    2797          26 :                         BATiter bi = bat_iterator(b);
    2798          26 :                         if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
    2799          17 :                                 const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
    2800          17 :                                 size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
    2801             : 
    2802          17 :                                 if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
    2803           0 :                                         _DELETE(c->min);
    2804           0 :                                         _DELETE(c->max);
    2805             :                                 } else {
    2806          17 :                                         memcpy(c->min, nmin, minlen);
    2807          17 :                                         memcpy(c->max, nmax, maxlen);
    2808          17 :                                         ok = 1;
    2809             :                                 }
    2810             :                         }
    2811          26 :                         bat_iterator_end(&bi);
    2812          26 :                         bat_destroy(b);
    2813             :                 }
    2814          26 :                 unlock_column(tr->store, c->base.id);
    2815             :         }
    2816             :         return ok;
    2817             : }
    2818             : 
    2819             : static size_t
    2820          17 : count_segs(segment *s)
    2821             : {
    2822          17 :         size_t nr = 0;
    2823             : 
    2824          72 :         for( ; s; s = ATOMIC_PTR_GET(&s->next))
    2825          55 :                 nr++;
    2826          17 :         return nr;
    2827             : }
    2828             : 
    2829             : static size_t
    2830          34 : count_del(sql_trans *tr, sql_table *t, int access)
    2831             : {
    2832          34 :         storage *d;
    2833             : 
    2834          34 :         if (!isTable(t))
    2835             :                 return 0;
    2836          34 :         d = tab_timestamp_storage(tr, t);
    2837          34 :         if (!d)
    2838             :                 return 0;
    2839          34 :         if (access == 2)
    2840           0 :                 return d->cs.ucnt;
    2841          34 :         if (access == 1)
    2842           0 :                 return count_inserts(d->segs->h, tr);
    2843          34 :         if (access == 10) /* special case for counting the number of segments */
    2844          17 :                 return count_segs(d->segs->h);
    2845          17 :         return count_deletes(d->segs->h, tr);
    2846             : }
    2847             : 
    2848             : static int
    2849       27002 : sorted_col(sql_trans *tr, sql_column *col)
    2850             : {
    2851       27002 :         int sorted = 0;
    2852             : 
    2853       27002 :         assert(tr->active);
    2854       27002 :         if (!isTable(col->t) || !col->t->s)
    2855             :                 return 0;
    2856             : 
    2857       27002 :         if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
    2858       26983 :                 BAT *b = bind_col(tr, col, QUICK);
    2859             : 
    2860       26983 :                 if (b)
    2861       26983 :                         sorted = b->tsorted || b->trevsorted;
    2862             :         }
    2863             :         return sorted;
    2864             : }
    2865             : 
    2866             : static int
    2867        7547 : unique_col(sql_trans *tr, sql_column *col)
    2868             : {
    2869        7547 :         int distinct = 0;
    2870             : 
    2871        7547 :         assert(tr->active);
    2872        7547 :         if (!isTable(col->t) || !col->t->s)
    2873             :                 return 0;
    2874             : 
    2875        7547 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2876        7547 :                 BAT *b = bind_col(tr, col, QUICK);
    2877             : 
    2878        7547 :                 if (b)
    2879        7547 :                         distinct = b->tkey;
    2880             :         }
    2881             :         return distinct;
    2882             : }
    2883             : 
    2884             : static int
    2885        1920 : double_elim_col(sql_trans *tr, sql_column *col)
    2886             : {
    2887        1920 :         int de = 0;
    2888        1920 :         sql_delta *d;
    2889             : 
    2890        1920 :         assert(tr->active);
    2891        1920 :         if (!isTable(col->t) || !col->t->s)
    2892             :                 return 0;
    2893             : 
    2894        1920 :         if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
    2895           6 :                 if (d->cs.st == ST_DICT) {
    2896           6 :                         BAT *b = bind_col(tr, col, QUICK);
    2897           6 :                         if (b && b->ttype == TYPE_bte)
    2898             :                                 de = 1;
    2899           0 :                         else if (b && b->ttype == TYPE_sht)
    2900        1920 :                                 de = 2;
    2901             :                 }
    2902        1914 :         } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
    2903        1914 :                 BAT *b = bind_col(tr, col, QUICK);
    2904             : 
    2905        1914 :                 if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
    2906        1914 :                         de = GDK_ELIMDOUBLES(b->tvheap);
    2907        1914 :                         if (de)
    2908        1688 :                                 de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
    2909             :                 }
    2910        1688 :                 assert(de >= 0 && de <= 16);
    2911             :         }
    2912             :         return de;
    2913             : }
    2914             : 
    2915             : static int
    2916     3787802 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
    2917             : {
    2918     3787802 :         int ok = 0;
    2919     3787802 :         BAT *b = NULL, *off = NULL, *upv = NULL;
    2920     3787802 :         sql_delta *d = NULL;
    2921             : 
    2922     3787802 :         (void) tr;
    2923     3787802 :         assert(tr->active);
    2924     3787802 :         *nonil = false;
    2925     3787802 :         *unique = false;
    2926     3787802 :         *unique_est = 0.0;
    2927     3787802 :         if (!c || !isTable(c->t) || !c->t->s)
    2928             :                 return ok;
    2929             : 
    2930     3787583 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2931     3748568 :                 if (d->cs.st == ST_FOR) {
    2932          30 :                         *nonil = true; /* TODO for min/max. I will do it later */
    2933          30 :                         return ok;
    2934             :                 }
    2935     3748538 :                 int eclass = c->type.type->eclass;
    2936     3748538 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2937     3748538 :                 if ((b = bind_col(tr, c, access))) {
    2938     3748590 :                         if (!(b = bind_no_view(b, false)))
    2939           0 :                                 return ok;
    2940     3748520 :                         BATiter bi = bat_iterator(b);
    2941     3748527 :                         *nonil = bi.nonil && !bi.nil;
    2942             : 
    2943     3748527 :                         if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
    2944     3451060 :                                 d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
    2945     2273262 :                                 if (c->min && VALinit(min, bi.type, c->min))
    2946             :                                         ok |= 1;
    2947     2273208 :                                 else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
    2948     2264131 :                                         ok |= 1;
    2949     2273221 :                                 if (c->max && VALinit(max, bi.type, c->max))
    2950          54 :                                         ok |= 2;
    2951     2273167 :                                 else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
    2952     2257989 :                                         ok |= 2;
    2953             :                         }
    2954     3748477 :                         if (d->cs.ucnt == 0) {
    2955     3745724 :                                 if (d->cs.st == ST_DEFAULT) {
    2956     3745019 :                                         *unique = bi.key;
    2957     3745019 :                                         *unique_est = bi.unique_est;
    2958     3745019 :                                         if (*unique_est == 0)
    2959     1194935 :                                                 *unique_est = (double)BATguess_uniques(b,NULL);
    2960         705 :                                 } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
    2961             :                                         /* for dict, check the offsets bat for uniqueness */
    2962         705 :                                         MT_lock_set(&off->theaplock);
    2963         705 :                                         *unique = off->tkey;
    2964         705 :                                         *unique_est = off->tunique_est;
    2965         705 :                                         MT_lock_unset(&off->theaplock);
    2966             :                                 }
    2967             :                         }
    2968     3748482 :                         bat_iterator_end(&bi);
    2969     3748544 :                         bat_destroy(b);
    2970     3748534 :                         if (*nonil && d->cs.ucnt > 0) {
    2971             :                                 /* This could use a quick descriptor */
    2972         519 :                                 if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
    2973           0 :                                         *nonil = false;
    2974             :                                 } else {
    2975         519 :                                         MT_lock_set(&upv->theaplock);
    2976         519 :                                         *nonil &= upv->tnonil && !upv->tnil;
    2977         519 :                                         MT_lock_unset(&upv->theaplock);
    2978         519 :                                         bat_destroy(upv);
    2979             :                                 }
    2980             :                         }
    2981             :                 }
    2982             :         }
    2983             :         return ok;
    2984             : }
    2985             : 
    2986             : static int
    2987         257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
    2988             : {
    2989         257 :         assert(tr->active);
    2990         257 :         if (!isTable(col->t) || !col->t->s)
    2991             :                 return LOG_OK;
    2992             : 
    2993         252 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2994         252 :                 BAT *b = bind_col(tr, col, QUICK);
    2995             : 
    2996         252 :                 if (b) { /* add props for ranges [min, max> */
    2997         252 :                         MT_lock_set(&b->theaplock);
    2998         252 :                         if (add_range) {
    2999         179 :                                 BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
    3000         179 :                                 if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
    3001         103 :                                         BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
    3002             :                                 else
    3003          76 :                                         BATrmprop_nolock(b, GDK_MAX_BOUND);
    3004         179 :                                 if (!pt->with_nills || !col->null)
    3005         117 :                                         BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3006             :                         } else {
    3007          73 :                                 BATrmprop_nolock(b, GDK_MIN_BOUND);
    3008          73 :                                 BATrmprop_nolock(b, GDK_MAX_BOUND);
    3009          73 :                                 BATrmprop_nolock(b, GDK_NOT_NULL);
    3010             :                         }
    3011         252 :                         MT_lock_unset(&b->theaplock);
    3012             :                 }
    3013             :         }
    3014             :         return LOG_OK;
    3015             : }
    3016             : 
    3017             : static int
    3018        4158 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
    3019             : {
    3020        4158 :         assert(tr->active);
    3021        4158 :         if (!isTable(col->t) || !col->t->s)
    3022             :                 return LOG_OK;
    3023             : 
    3024        4128 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    3025        4128 :                 BAT *b = bind_col(tr, col, QUICK);
    3026             : 
    3027        4128 :                 if (b) { /* add props for ranges [min, max> */
    3028        4128 :                         if (not_null) {
    3029        4126 :                                 BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3030             :                         } else {
    3031           2 :                                 BATrmprop(b, GDK_NOT_NULL);
    3032             :                         }
    3033             :                 }
    3034             :         }
    3035             :         return LOG_OK;
    3036             : }
    3037             : 
    3038             : static int
    3039       29108 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
    3040             : {
    3041       29108 :         sqlstore *store = tr->store;
    3042       29108 :         int bid = log_find_bat(store->logger, id);
    3043       29108 :         if (bid <= 0)
    3044             :                 return LOG_ERR;
    3045       29108 :         cs->bid = temp_dup(bid);
    3046       29108 :         cs->ucnt = 0;
    3047       29108 :         cs->uibid = e_bat(TYPE_oid);
    3048       29108 :         cs->uvbid = e_bat(type);
    3049       29108 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    3050             :                 return LOG_ERR;
    3051             :         return LOG_OK;
    3052             : }
    3053             : 
    3054             : static int
    3055       67604 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
    3056             : {
    3057       67604 :         int res = LOG_OK;
    3058       67604 :         gdk_return ok;
    3059       67604 :         BAT *b = temp_descriptor(bat->cs.bid);
    3060             : 
    3061       67604 :         if (b == NULL)
    3062             :                 return LOG_ERR;
    3063             : 
    3064       67604 :         if (!bat->cs.uibid)
    3065       67596 :                 bat->cs.uibid = e_bat(TYPE_oid);
    3066       67604 :         if (!bat->cs.uvbid)
    3067       67596 :                 bat->cs.uvbid = e_bat(b->ttype);
    3068       67604 :         if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
    3069           0 :                 res = LOG_ERR;
    3070       67604 :         if (GDKinmemory(0)) {
    3071         174 :                 bat_destroy(b);
    3072         174 :                 return res;
    3073             :         }
    3074             : 
    3075       67430 :         bat_set_access(b, BAT_READ);
    3076       67430 :         sqlstore *store = tr->store;
    3077       67430 :         ok = log_bat_persists(store->logger, b, id);
    3078       67430 :         bat_destroy(b);
    3079       67430 :         if(res != LOG_OK)
    3080             :                 return res;
    3081       67430 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3082             : }
    3083             : 
    3084             : static int
    3085           0 : new_persistent_delta( sql_delta *bat)
    3086             : {
    3087           0 :         bat->cs.ucnt = 0;
    3088           0 :         return LOG_OK;
    3089             : }
    3090             : 
    3091             : static void
    3092      128720 : create_delta( sql_delta *d, BAT *b)
    3093             : {
    3094      128720 :         bat_set_access(b, BAT_READ);
    3095      128720 :         d->cs.bid = temp_create(b);
    3096      128720 :         d->cs.uibid = d->cs.uvbid = 0;
    3097      128720 :         d->cs.ucnt = 0;
    3098      128720 : }
    3099             : 
    3100             : static bat
    3101        7280 : copyBat (bat i, int type, oid seq)
    3102             : {
    3103        7280 :         BAT *b, *tb;
    3104        7280 :         bat res;
    3105             : 
    3106        7280 :         if (!i)
    3107             :                 return i;
    3108        7280 :         tb = quick_descriptor(i);
    3109        7280 :         if (tb == NULL)
    3110             :                 return 0;
    3111        7280 :         b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
    3112        7280 :         if (b == NULL)
    3113             :                 return 0;
    3114             : 
    3115        7280 :         bat_set_access(b, BAT_READ);
    3116             : 
    3117        7280 :         res = temp_create(b);
    3118        7280 :         bat_destroy(b);
    3119        7280 :         return res;
    3120             : }
    3121             : 
    3122             : static int
    3123      151035 : create_col(sql_trans *tr, sql_column *c)
    3124             : {
    3125      151035 :         int ok = LOG_OK, new = 0;
    3126      151035 :         int type = c->type.type->localtype;
    3127      151035 :         sql_delta *bat = ATOMIC_PTR_GET(&c->data);
    3128             : 
    3129      151035 :         if (!bat) {
    3130      151035 :                 new = 1;
    3131      151035 :                 bat = ZNEW(sql_delta);
    3132      151035 :                 if (!bat)
    3133             :                         return LOG_ERR;
    3134      151035 :                 ATOMIC_PTR_SET(&c->data, bat);
    3135      151035 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3136             :         }
    3137             : 
    3138      151035 :         if (new)
    3139      151035 :                 bat->cs.ts = tr->tid;
    3140             : 
    3141      151035 :         if (!isNew(c)&& !isTempTable(c->t)){
    3142       22293 :                 bat->cs.ts = tr->ts;
    3143       22293 :                 ok = load_cs(tr, &bat->cs, type, c->base.id);
    3144       22293 :                 if (ok == LOG_OK && c->storage_type) {
    3145           4 :                         if (strcmp(c->storage_type, "DICT") == 0) {
    3146           2 :                                 sqlstore *store = tr->store;
    3147           2 :                                 int bid = log_find_bat(store->logger, -c->base.id);
    3148           2 :                                 if (bid <= 0)
    3149             :                                         return LOG_ERR;
    3150           2 :                                 bat->cs.ebid = temp_dup(bid);
    3151           2 :                                 bat->cs.st = ST_DICT;
    3152           2 :                         } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
    3153           2 :                                 bat->cs.st = ST_FOR;
    3154             :                         }
    3155             :                 }
    3156       22293 :                 return ok;
    3157      128742 :         } else if (bat && bat->cs.bid) {
    3158           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
    3159             :         } else {
    3160      128742 :                 sql_column *fc = NULL;
    3161      128742 :                 size_t cnt = 0;
    3162             : 
    3163             :                 /* alter ? */
    3164      128742 :                 if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
    3165       79180 :                         storage *s = tab_timestamp_storage(tr, fc->t);
    3166       79180 :                         if (s == NULL)
    3167             :                                 return LOG_ERR;
    3168       79180 :                         cnt = segs_end(s->segs, tr, c->t);
    3169             :                 }
    3170      128742 :                 if (cnt && fc != c) {
    3171          22 :                         sql_delta *d = ATOMIC_PTR_GET(&fc->data);
    3172             : 
    3173          22 :                         if (d->cs.bid) {
    3174          22 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3175          22 :                                 if(bat->cs.bid == BID_NIL)
    3176          22 :                                         ok = LOG_ERR;
    3177             :                         }
    3178          22 :                         if (d->cs.uibid) {
    3179          10 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3180          10 :                                 if (bat->cs.uibid == BID_NIL)
    3181          22 :                                         ok = LOG_ERR;
    3182             :                         }
    3183          22 :                         if (d->cs.uvbid) {
    3184          10 :                                 bat->cs.uvbid = e_bat(type);
    3185          10 :                                 if(bat->cs.uvbid == BID_NIL)
    3186           0 :                                         ok = LOG_ERR;
    3187             :                         }
    3188             :                 } else {
    3189      128720 :                         BAT *b = bat_new(type, c->t->sz, PERSISTENT);
    3190      128720 :                         if (!b) {
    3191             :                                 ok = LOG_ERR;
    3192             :                         } else {
    3193      128720 :                                 create_delta(ATOMIC_PTR_GET(&c->data), b);
    3194      128720 :                                 bat_destroy(b);
    3195             :                         }
    3196             : 
    3197      128720 :                         if (!new) {
    3198           0 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3199           0 :                                 if (bat->cs.uibid == BID_NIL)
    3200           0 :                                         ok = LOG_ERR;
    3201           0 :                                 bat->cs.uvbid = e_bat(type);
    3202           0 :                                 if(bat->cs.uvbid == BID_NIL)
    3203           0 :                                         ok = LOG_ERR;
    3204             :                         }
    3205             :                 }
    3206      128742 :                 bat->cs.ucnt = 0;
    3207             : 
    3208      128742 :                 if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
    3209          90 :                         trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
    3210             :         }
    3211             :         return ok;
    3212             : }
    3213             : 
    3214             : static int
    3215       61249 : log_create_col_(sql_trans *tr, sql_column *c)
    3216             : {
    3217       61249 :         assert(!isTempTable(c->t));
    3218       61249 :         return log_create_delta(tr,  ATOMIC_PTR_GET(&c->data), c->base.id);
    3219             : }
    3220             : 
    3221             : static int
    3222          86 : log_create_col(sql_trans *tr, sql_change *change)
    3223             : {
    3224          86 :         return log_create_col_(tr, (sql_column*)change->obj);
    3225             : }
    3226             : 
    3227             : static int
    3228      116111 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
    3229             : {
    3230      116111 :         (void) t; // TODO transaction_layer_revamp: remove if unnecessary
    3231      116111 :         (void)oldest;
    3232      116111 :         assert(delta->cs.ts == tr->tid);
    3233      116111 :         delta->cs.ts = commit_ts;
    3234             : 
    3235      116111 :         assert(delta->next == NULL);
    3236      116111 :         if (!delta->cs.merged)
    3237      116110 :                 merge_delta(delta);
    3238      116111 :         if (!tr->parent)
    3239      116107 :                 base->new = 0;
    3240      116111 :         return LOG_OK;
    3241             : }
    3242             : 
    3243             : static int
    3244          90 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3245             : {
    3246          90 :         sql_column *c = (sql_column*)change->obj;
    3247          90 :         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3248          90 :         if (!tr->parent)
    3249          89 :                 c->base.new = 0;
    3250          90 :         return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
    3251             : }
    3252             : 
    3253             : /* will be called for new idx's and when new index columns are created */
    3254             : static int
    3255        9448 : create_idx(sql_trans *tr, sql_idx *ni)
    3256             : {
    3257        9448 :         int ok = LOG_OK, new = 0;
    3258        9448 :         sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
    3259        9448 :         int type = TYPE_lng;
    3260             : 
    3261        9448 :         if (oid_index(ni->type))
    3262         955 :                 type = TYPE_oid;
    3263             : 
    3264        9448 :         if (!bat) {
    3265        9448 :                 new = 1;
    3266        9448 :                 bat = ZNEW(sql_delta);
    3267        9448 :                 if (!bat)
    3268             :                         return LOG_ERR;
    3269        9448 :                 ATOMIC_PTR_INIT(&ni->data, bat);
    3270        9448 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3271             :         }
    3272             : 
    3273        9448 :         if (new)
    3274        9448 :                 bat->cs.ts = tr->tid;
    3275             : 
    3276        9448 :         if (!isNew(ni) && !isTempTable(ni->t)){
    3277        2190 :                 bat->cs.ts = 1;
    3278        2190 :                 return load_cs(tr, &bat->cs, type, ni->base.id);
    3279        7258 :         } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
    3280           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
    3281             :         } else {
    3282        7258 :                 sql_column *c = ol_first_node(ni->t->columns)->data;
    3283        7258 :                 sql_delta *d = col_timestamp_delta(tr, c);
    3284             : 
    3285        7258 :                 if (d) {
    3286             :                         /* Here we also handle indices created through alter stmts */
    3287             :                         /* These need to be created aligned to the existing data */
    3288        7258 :                         if (d->cs.bid) {
    3289        7258 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3290        7258 :                                 if(bat->cs.bid == BID_NIL)
    3291        7258 :                                         ok = LOG_ERR;
    3292             :                         }
    3293             :                 } else {
    3294             :                         return LOG_ERR;
    3295             :                 }
    3296             : 
    3297        7258 :                 bat->cs.ucnt = 0;
    3298             : 
    3299        7258 :                 if (!new) {
    3300           0 :                         bat->cs.uibid = e_bat(TYPE_oid);
    3301           0 :                         if (bat->cs.uibid == BID_NIL)
    3302           0 :                                 ok = LOG_ERR;
    3303           0 :                         bat->cs.uvbid = e_bat(type);
    3304           0 :                         if(bat->cs.uvbid == BID_NIL)
    3305           0 :                                 ok = LOG_ERR;
    3306             :                 }
    3307        7258 :                 bat->cs.ucnt = 0;
    3308        7258 :                 if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
    3309         631 :                         trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
    3310             :         }
    3311             :         return ok;
    3312             : }
    3313             : 
    3314             : static int
    3315        6355 : log_create_idx_(sql_trans *tr, sql_idx *i)
    3316             : {
    3317        6355 :         assert(!isTempTable(i->t));
    3318        6355 :         return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3319             : }
    3320             : 
    3321             : static int
    3322         617 : log_create_idx(sql_trans *tr, sql_change *change)
    3323             : {
    3324         617 :         return log_create_idx_(tr, (sql_idx*)change->obj);
    3325             : }
    3326             : 
    3327             : static int
    3328         631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3329             : {
    3330         631 :         sql_idx *i = (sql_idx*)change->obj;
    3331         631 :         sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3332         631 :         if (!tr->parent)
    3333         631 :                 i->base.new = 0;
    3334         631 :         return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
    3335             :         return LOG_OK;
    3336             : }
    3337             : 
    3338             : static int
    3339        4625 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
    3340             : {
    3341        4625 :         int ok = load_cs(tr, &s->cs, TYPE_msk, id);
    3342        4625 :         BAT *b = NULL, *ib = NULL;
    3343             : 
    3344        4625 :         if (ok != LOG_OK)
    3345             :                 return ok;
    3346        4625 :         if (!(b = temp_descriptor(s->cs.bid)))
    3347             :                 return LOG_ERR;
    3348        4625 :         ib = b;
    3349             : 
    3350        4625 :         if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
    3351           0 :                 bat_destroy(ib);
    3352           0 :                 return LOG_ERR;
    3353             :         }
    3354             : 
    3355        4625 :         if (BATcount(b)) {
    3356         284 :                 if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
    3357           0 :                         bat_destroy(ib);
    3358           0 :                         return LOG_ERR;
    3359             :                 }
    3360         418 :                 if (BATtdense(b)) {
    3361         134 :                         size_t start = b->tseqbase;
    3362         134 :                         size_t cnt = BATcount(b);
    3363         134 :                         ok = delete_range(tr, t, s, start, cnt);
    3364             :                 } else {
    3365         150 :                         assert(b->tsorted);
    3366         150 :                         BUN icnt = BATcount(b);
    3367         150 :                         BATiter bi = bat_iterator(b);
    3368         150 :                         size_t lcnt = 1;
    3369         150 :                         oid n;
    3370         150 :                         segment *seg = s->segs->h;
    3371         150 :                         if (complex_cand(b)) {
    3372           0 :                                 oid o = * (oid *) Tpos(&bi, 0);
    3373           0 :                                 n = o + 1;
    3374           0 :                                 for (BUN i = 1; i < icnt; i++) {
    3375           0 :                                         o = * (oid *) Tpos(&bi, i);
    3376           0 :                                         if (o == n) {
    3377           0 :                                                 lcnt++;
    3378           0 :                                                 n++;
    3379             :                                         } else {
    3380           0 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3381             :                                                         break;
    3382             :                                                 lcnt = 0;
    3383             :                                         }
    3384           0 :                                         if (!lcnt) {
    3385           0 :                                                 n = o + 1;
    3386           0 :                                                 lcnt = 1;
    3387             :                                         }
    3388             :                                 }
    3389             :                         } else {
    3390         150 :                                 oid *o = bi.base;
    3391         150 :                                 n = o[0]+1;
    3392      207947 :                                 for (size_t i=1; i<icnt; i++) {
    3393      207797 :                                         if (o[i] == n) {
    3394      205847 :                                                 lcnt++;
    3395      205847 :                                                 n++;
    3396             :                                         } else {
    3397        1950 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3398             :                                                         break;
    3399             :                                                 lcnt = 0;
    3400             :                                         }
    3401      207797 :                                         if (!lcnt) {
    3402        1950 :                                                 n = o[i]+1;
    3403        1950 :                                                 lcnt = 1;
    3404             :                                         }
    3405             :                                 }
    3406             :                         }
    3407         150 :                         if (lcnt && ok == LOG_OK)
    3408         150 :                                 ok = delete_range(tr, t, s, n-lcnt, lcnt);
    3409         150 :                         bat_iterator_end(&bi);
    3410             :                 }
    3411         284 :                 if (ok == LOG_OK)
    3412        4920 :                         for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
    3413        4636 :                                 if (seg->ts == tr->tid)
    3414        2405 :                                         seg->ts = 1;
    3415             :         } else {
    3416        4341 :                 if (ok == LOG_OK) {
    3417        4341 :                         BAT *bb = quick_descriptor(s->cs.bid);
    3418             : 
    3419        4341 :                         if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
    3420             :                                 ok = LOG_ERR;
    3421             :                         } else {
    3422        4341 :                                 segment *seg = s->segs->h;
    3423        4341 :                                 if (seg->ts == tr->tid)
    3424        4341 :                                         seg->ts = 1;
    3425             :                         }
    3426             :                 }
    3427             :         }
    3428        4625 :         if (b != ib)
    3429        4625 :                 bat_destroy(b);
    3430        4625 :         bat_destroy(ib);
    3431             : 
    3432        4625 :         return ok;
    3433             : }
    3434             : 
    3435             : static int
    3436       25236 : create_del(sql_trans *tr, sql_table *t)
    3437             : {
    3438       25236 :         int ok = LOG_OK, new = 0;
    3439       25236 :         BAT *b;
    3440       25236 :         storage *bat = ATOMIC_PTR_GET(&t->data);
    3441             : 
    3442       25236 :         if (!bat) {
    3443       25236 :                 new = 1;
    3444       25236 :                 bat = ZNEW(storage);
    3445       25236 :                 if(!bat)
    3446             :                         return LOG_ERR;
    3447       25236 :                 ATOMIC_PTR_INIT(&t->data, bat);
    3448       25236 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3449       25236 :                 bat->cs.ts = tr->tid;
    3450             :         }
    3451             : 
    3452       25236 :         if (!isNew(t) && !isTempTable(t)) {
    3453        4625 :                 bat->cs.ts = tr->ts;
    3454        4625 :                 return load_storage(tr, t, bat, t->base.id);
    3455       20611 :         } else if (bat->cs.bid) {
    3456             :                 return ok;
    3457             :         } else {
    3458       20611 :                 assert(!bat->segs);
    3459       20611 :                 if (!(bat->segs = new_segments(tr, 0)))
    3460             :                         return LOG_ERR;
    3461             : 
    3462       20611 :                 b = bat_new(TYPE_msk, t->sz, PERSISTENT);
    3463       20611 :                 if(b != NULL) {
    3464       20611 :                         bat_set_access(b, BAT_READ);
    3465       20611 :                         bat->cs.bid = temp_create(b);
    3466       20611 :                         bat_destroy(b);
    3467             :                 } else {
    3468             :                         return LOG_ERR;
    3469             :                 }
    3470       20611 :                 if (new)
    3471       27168 :                         trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
    3472             :         }
    3473             :         return ok;
    3474             : }
    3475             : 
    3476             : static int
    3477      193683 : log_segment(sql_trans *tr, segment *s, sqlid id)
    3478             : {
    3479      193683 :         sqlstore *store = tr->store;
    3480      193683 :         msk m = s->deleted;
    3481      193683 :         return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
    3482             : }
    3483             : 
    3484             : static int
    3485       96129 : log_segments(sql_trans *tr, segments *segs, sqlid id)
    3486             : {
    3487             :         /* log segments */
    3488       96129 :         lock_table(tr->store, id);
    3489      493141 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3490      397012 :                 unlock_table(tr->store, id);
    3491      397012 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3492      146961 :                         if (log_segment(tr, seg, id) != LOG_OK) {
    3493           0 :                                 unlock_table(tr->store, id);
    3494           0 :                                 return LOG_ERR;
    3495             :                         }
    3496             :                 }
    3497      397012 :                 lock_table(tr->store, id);
    3498             :         }
    3499       96129 :         unlock_table(tr->store, id);
    3500       96129 :         return LOG_OK;
    3501             : }
    3502             : 
    3503             : static int
    3504       12291 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
    3505             : {
    3506       12291 :         BAT *b;
    3507       12291 :         int ok = LOG_OK;
    3508             : 
    3509       12291 :         if (GDKinmemory(0))
    3510             :                 return LOG_OK;
    3511             : 
    3512       12259 :         b = temp_descriptor(bat->cs.bid);
    3513       12259 :         if (b == NULL)
    3514             :                 return LOG_ERR;
    3515             : 
    3516       12259 :         sqlstore *store = tr->store;
    3517       12259 :         bat_set_access(b, BAT_READ);
    3518       12259 :         if (ok == LOG_OK)
    3519       12259 :                 ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
    3520       12259 :         if (ok == LOG_OK)
    3521       12259 :                 ok = log_segments(tr, bat->segs, t->base.id);
    3522       12259 :         bat_destroy(b);
    3523       12259 :         return ok;
    3524             : }
    3525             : 
    3526             : static int
    3527       12305 : log_create_del(sql_trans *tr, sql_change *change)
    3528             : {
    3529       12305 :         int ok = LOG_OK;
    3530       12305 :         sql_table *t = (sql_table*)change->obj;
    3531             : 
    3532       12305 :         if (t->base.deleted)
    3533             :                 return ok;
    3534       12291 :         assert(!isTempTable(t));
    3535       12291 :         ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
    3536       12291 :         if (ok == LOG_OK) {
    3537       73454 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3538       61163 :                         sql_column *c = n->data;
    3539             : 
    3540       61163 :                         ok = log_create_col_(tr, c);
    3541             :                 }
    3542       12291 :                 if (t->idxs) {
    3543       18041 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3544        5750 :                                 sql_idx *i = n->data;
    3545             : 
    3546        5750 :                                 if (ATOMIC_PTR_GET(&i->data))
    3547        5738 :                                         ok = log_create_idx_(tr, i);
    3548             :                         }
    3549             :                 }
    3550             :         }
    3551             :         return ok;
    3552             : }
    3553             : 
    3554             : static int
    3555       20613 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3556             : {
    3557       20613 :         int ok = LOG_OK;
    3558       20613 :         sql_table *t = (sql_table*)change->obj;
    3559       20613 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    3560             : 
    3561       20613 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    3562         119 :                 assert(isTempTable(t));
    3563         119 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    3564         119 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    3565         119 :                 return ok;
    3566             :         }
    3567             : 
    3568       20494 :         if (!commit_ts) /* rollback handled by ? */
    3569             :                 return ok;
    3570       18640 :         ok = segments2cs(tr, dbat->segs, &dbat->cs);
    3571       18640 :         assert(ok == LOG_OK);
    3572       18640 :         if (ok != LOG_OK)
    3573             :                 return ok;
    3574       18640 :         merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
    3575       18640 :         assert(dbat->cs.ts == tr->tid);
    3576       18640 :         dbat->cs.ts = commit_ts;
    3577       18640 :         if (ok == LOG_OK) {
    3578      128279 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3579      109639 :                         sql_column *c = n->data;
    3580      109639 :                         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3581             : 
    3582      109639 :                         ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
    3583             :                 }
    3584       18640 :                 if (t->idxs) {
    3585       24403 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3586        5763 :                                 sql_idx *i = n->data;
    3587        5763 :                                 sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3588             : 
    3589        5763 :                                 if (delta)
    3590        5751 :                                         ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
    3591             :                         }
    3592             :                 }
    3593       18640 :                 if (!tr->parent)
    3594       18638 :                         t->base.new = 0;
    3595             :         }
    3596       18640 :         if (!tr->parent)
    3597       18638 :                 t->base.new = 0;
    3598             :         return ok;
    3599             : }
    3600             : 
    3601             : static int
    3602       18524 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
    3603             : {
    3604       18524 :         gdk_return ok = GDK_SUCCEED;
    3605             : 
    3606       18524 :         sqlstore *store = tr->store;
    3607       18524 :         if (!GDKinmemory(0) && b && b->cs.bid)
    3608       18521 :                 ok = log_bat_transient(store->logger, id);
    3609       18524 :         if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
    3610          25 :                 ok = log_bat_transient(store->logger, -id);
    3611       18524 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3612             : }
    3613             : 
    3614             : static int
    3615      168788 : destroy_col(sqlstore *store, sql_column *c)
    3616             : {
    3617      168788 :         (void)store;
    3618      168788 :         if (ATOMIC_PTR_GET(&c->data))
    3619      168788 :                 destroy_delta(ATOMIC_PTR_GET(&c->data), true);
    3620      168788 :         ATOMIC_PTR_SET(&c->data, NULL);
    3621      168788 :         return LOG_OK;
    3622             : }
    3623             : 
    3624             : static int
    3625       16665 : log_destroy_col_(sql_trans *tr, sql_column *c)
    3626             : {
    3627       16665 :         int ok = LOG_OK;
    3628       16665 :         assert(!isTempTable(c->t));
    3629       16665 :         if (!tr->parent) /* don't write save point commits */
    3630       16665 :                 ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
    3631       16665 :         return ok;
    3632             : }
    3633             : 
    3634             : static int
    3635          55 : log_destroy_col(sql_trans *tr, sql_change *change)
    3636             : {
    3637          55 :         sql_column *c = (sql_column*)change->obj;
    3638          55 :         int res = log_destroy_col_(tr, c);
    3639          55 :         change->obj = NULL;
    3640          55 :         column_destroy(tr->store, c);
    3641          55 :         return res;
    3642             : }
    3643             : 
    3644             : static int
    3645       10825 : destroy_idx(sqlstore *store, sql_idx *i)
    3646             : {
    3647       10825 :         (void)store;
    3648       10825 :         if (ATOMIC_PTR_GET(&i->data))
    3649       10825 :                 destroy_delta(ATOMIC_PTR_GET(&i->data), true);
    3650       10825 :         ATOMIC_PTR_SET(&i->data, NULL);
    3651       10825 :         return LOG_OK;
    3652             : }
    3653             : 
    3654             : static int
    3655        1921 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
    3656             : {
    3657        1921 :         int ok = LOG_OK;
    3658        1921 :         assert(!isTempTable(i->t));
    3659        1921 :         if (ATOMIC_PTR_GET(&i->data)) {
    3660        1859 :                 if (!tr->parent) /* don't write save point commits */
    3661        1859 :                         ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3662             :         }
    3663        1921 :         return ok;
    3664             : }
    3665             : 
    3666             : static int
    3667         501 : log_destroy_idx(sql_trans *tr, sql_change *change)
    3668             : {
    3669         501 :         sql_idx *i = (sql_idx*)change->obj;
    3670         501 :         int res = log_destroy_idx_(tr, i);
    3671         501 :         change->obj = NULL;
    3672         501 :         idx_destroy(tr->store, i);
    3673         501 :         return res;
    3674             : }
    3675             : 
    3676             : static int
    3677       26738 : destroy_del(sqlstore *store, sql_table *t)
    3678             : {
    3679       26738 :         (void)store;
    3680       26738 :         if (ATOMIC_PTR_GET(&t->data))
    3681       26728 :                 destroy_storage(ATOMIC_PTR_GET(&t->data));
    3682       26738 :         ATOMIC_PTR_SET(&t->data, NULL);
    3683       26738 :         return LOG_OK;
    3684             : }
    3685             : 
    3686             : static int
    3687        3158 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
    3688             : {
    3689        3158 :         gdk_return ok = GDK_SUCCEED;
    3690             : 
    3691        3158 :         sqlstore *store = tr->store;
    3692        3158 :         if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
    3693        3158 :             bat && bat->cs.bid)
    3694        3158 :                 ok = log_bat_transient(store->logger, id);
    3695        3158 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3696             : }
    3697             : 
    3698             : static int
    3699        3158 : log_destroy_del(sql_trans *tr, sql_change *change)
    3700             : {
    3701        3158 :         int ok = LOG_OK;
    3702        3158 :         sql_table *t = (sql_table*)change->obj;
    3703             : 
    3704        3158 :         assert(!isTempTable(t));
    3705        3158 :         ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
    3706        3158 :         if (ok == LOG_OK) {
    3707       19768 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3708       16610 :                         sql_column *c = n->data;
    3709             : 
    3710       16610 :                         ok = log_destroy_col_(tr, c);
    3711             :                 }
    3712        3158 :                 if (t->idxs) {
    3713        4578 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3714        1420 :                                 sql_idx *i = n->data;
    3715             : 
    3716        1420 :                                 ok = log_destroy_idx_(tr, i);
    3717             :                         }
    3718             :                 }
    3719             :         }
    3720        3158 :         return ok;
    3721             : }
    3722             : 
    3723             : static int
    3724        3796 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3725             : {
    3726        3796 :         (void)tr;
    3727        3796 :         (void)change;
    3728        3796 :         (void)commit_ts;
    3729        3796 :         (void)oldest;
    3730        3796 :         if (commit_ts)
    3731        3781 :                 change->handled = true;
    3732        3796 :         return 0;
    3733             : }
    3734             : 
    3735             : static int
    3736        3231 : drop_del(sql_trans *tr, sql_table *t)
    3737             : {
    3738        3231 :         int ok = LOG_OK;
    3739             : 
    3740        3231 :         if (!isNew(t)) {
    3741        3231 :                 storage *bat = ATOMIC_PTR_GET(&t->data);
    3742        3298 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_destroy_del);
    3743             :         }
    3744        3231 :         return ok;
    3745             : }
    3746             : 
    3747             : static int
    3748          63 : drop_col(sql_trans *tr, sql_column *c)
    3749             : {
    3750          63 :         assert(!isNew(c));
    3751          63 :         sql_delta *d = ATOMIC_PTR_GET(&c->data);
    3752          63 :         trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_destroy_col);
    3753          63 :         return LOG_OK;
    3754             : }
    3755             : 
    3756             : static int
    3757         502 : drop_idx(sql_trans *tr, sql_idx *i)
    3758             : {
    3759         502 :         assert(!isNew(i));
    3760         502 :         sql_delta *d = ATOMIC_PTR_GET(&i->data);
    3761         502 :         trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_destroy_idx);
    3762         502 :         return LOG_OK;
    3763             : }
    3764             : 
    3765             : 
    3766             : static BUN
    3767      129620 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
    3768             : {
    3769      129620 :         BAT *b;
    3770      129620 :         BUN sz = 0;
    3771             : 
    3772      129620 :         (void)tr;
    3773      129620 :         assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
    3774      129620 :         if (cs->bid && renew) {
    3775      129621 :                 b = quick_descriptor(cs->bid);
    3776      129612 :                 if (b) {
    3777      129612 :                         sz += BATcount(b);
    3778      129612 :                         if (cs->st == ST_DICT) {
    3779           2 :                                 bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
    3780           2 :                                 BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
    3781             : 
    3782           2 :                                 if (nebid == BID_NIL || !n) {
    3783           0 :                                         temp_destroy(nebid);
    3784           0 :                                         bat_destroy(n);
    3785           0 :                                         return BUN_NONE;
    3786             :                                 }
    3787           2 :                                 temp_destroy(cs->ebid);
    3788           2 :                                 cs->ebid = nebid;
    3789           2 :                                 if (!temp)
    3790           2 :                                         bat_set_access(n, BAT_READ);
    3791           2 :                                 temp_destroy(cs->bid);
    3792           2 :                                 cs->bid = temp_create(n); /* create empty copy */
    3793           2 :                                 bat_destroy(n);
    3794             :                         } else {
    3795      129610 :                                 bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
    3796             : 
    3797      129604 :                                 if (nbid == BID_NIL)
    3798             :                                         return BUN_NONE;
    3799      129604 :                                 temp_destroy(cs->bid);
    3800      129591 :                                 cs->bid = nbid;
    3801             :                         }
    3802             :                 } else {
    3803             :                         return BUN_NONE;
    3804             :                 }
    3805             :         }
    3806      129592 :         if (cs->uibid) {
    3807      129449 :                 temp_destroy(cs->uibid);
    3808      129482 :                 cs->uibid = 0;
    3809             :         }
    3810      129625 :         if (cs->uvbid) {
    3811      129482 :                 temp_destroy(cs->uvbid);
    3812      129482 :                 cs->uvbid = 0;
    3813             :         }
    3814      129625 :         cs->cleared = true;
    3815      129625 :         cs->ucnt = 0;
    3816      129625 :         return sz;
    3817             : }
    3818             : 
    3819             : static BUN
    3820      129474 : clear_col(sql_trans *tr, sql_column *c, bool renew)
    3821             : {
    3822      129474 :         bool update_conflict = false;
    3823      129474 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    3824             : 
    3825      129474 :         if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
    3826           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3827      129475 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    3828      129475 :         if (odelta != delta)
    3829      129480 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    3830      129467 :         if (delta)
    3831      129467 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
    3832             :         return 0;
    3833             : }
    3834             : 
    3835             : static BUN
    3836          21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
    3837             : {
    3838          21 :         bool update_conflict = false;
    3839          21 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    3840             : 
    3841          21 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    3842          15 :                 return 0;
    3843           6 :         if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
    3844           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3845           6 :         assert(i->t->persistence != SQL_DECLARED_TABLE);
    3846           6 :         if (odelta != delta)
    3847           6 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    3848           6 :         if (delta)
    3849           6 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
    3850             :         return 0;
    3851             : }
    3852             : 
    3853             : static int
    3854         141 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
    3855             : {
    3856         141 :         if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
    3857             :                 return LOG_ERR;
    3858         141 :         if (s->segs)
    3859         141 :                 destroy_segments(s->segs);
    3860         141 :         if (!(s->segs = new_segments(tr, 0)))
    3861             :                 return LOG_ERR;
    3862             :         return LOG_OK;
    3863             : }
    3864             : 
    3865             : 
    3866             : /*
    3867             :  * Clear the table, in general this means replacing the storage,
    3868             :  * but in case of earlier deletes (or inserts by this transaction), we only mark
    3869             :  * all segments as deleted.
    3870             :  * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
    3871             :  */
    3872             : static BUN
    3873       41824 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
    3874             : {
    3875       41824 :         int clear = !in_transaction, ok = LOG_OK;
    3876       41824 :         bool conflict = false;
    3877       41824 :         storage *bat;
    3878             : 
    3879       41875 :         if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
    3880       15796 :                 return conflict?BUN_NONE-1:BUN_NONE;
    3881             : 
    3882       26028 :         if (!clear) {
    3883          51 :                 lock_table(tr->store, t->base.id);
    3884          51 :                 ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
    3885          51 :                 unlock_table(tr->store, t->base.id);
    3886             :         }
    3887       26028 :         assert(t->persistence != SQL_DECLARED_TABLE);
    3888       26028 :         if (!in_transaction)
    3889       25980 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    3890       26028 :         if (ok == LOG_ERR)
    3891             :                 return BUN_NONE;
    3892       26028 :         if (ok == LOG_CONFLICT)
    3893           0 :                 return BUN_NONE - 1;
    3894             :         return LOG_OK;
    3895             : }
    3896             : 
    3897             : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
    3898             : static BUN
    3899       41824 : clear_table(sql_trans *tr, sql_table *t)
    3900             : {
    3901       41824 :         node *n = ol_first_node(t->columns);
    3902       41824 :         sql_column *c = n->data;
    3903       41824 :         storage *d = tab_timestamp_storage(tr, t);
    3904       41824 :         int in_transaction, clear;
    3905       41824 :         BUN sz, clear_ok;
    3906             : 
    3907       41824 :         if (!d)
    3908             :                 return BUN_NONE;
    3909       41824 :         in_transaction = segments_in_transaction(tr, t);
    3910       41824 :         clear = !in_transaction;
    3911       41824 :         sz = count_col(tr, c, CNT_ACTIVE);
    3912       41824 :         if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
    3913             :                 return clear_ok;
    3914             : 
    3915       26028 :         if (in_transaction)
    3916             :                 return sz;
    3917             : 
    3918      155452 :         for (; n; n = n->next) {
    3919      129475 :                 c = n->data;
    3920             : 
    3921      129475 :                 if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
    3922           0 :                         return clear_ok;
    3923             :         }
    3924       25977 :         if (t->idxs) {
    3925       25998 :                 for (n = ol_first_node(t->idxs); n; n = n->next) {
    3926          21 :                         sql_idx *ci = n->data;
    3927             : 
    3928          21 :                         if (isTable(ci->t) && idx_has_column(ci->type) &&
    3929          21 :                                 (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
    3930           0 :                                 return clear_ok;
    3931             :                 }
    3932             :         }
    3933             :         return sz;
    3934             : }
    3935             : 
    3936             : static int
    3937      158890 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
    3938             : {
    3939      158890 :         sqlstore *store = tr->store;
    3940      158890 :         gdk_return ok = GDK_SUCCEED;
    3941             : 
    3942      158890 :         (void) t;
    3943      158890 :         (void) segs;
    3944      158890 :         if (GDKinmemory(0))
    3945             :                 return LOG_OK;
    3946             : 
    3947      158883 :         if (cs->cleared) {
    3948      155506 :                 assert(cs->ucnt == 0);
    3949      155506 :                 BAT *ins = temp_descriptor(cs->bid);
    3950      155506 :                 if (!ins)
    3951             :                         return LOG_ERR;
    3952      155506 :                 assert(!isEbat(ins));
    3953      155506 :                 bat_set_access(ins, BAT_READ);
    3954      155506 :                 ok = log_bat_persists(store->logger, ins, id);
    3955      155506 :                 bat_destroy(ins);
    3956      155506 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    3957          56 :                         BAT *ins = temp_descriptor(cs->ebid);
    3958          56 :                         if (!ins)
    3959             :                                 return LOG_ERR;
    3960          56 :                         assert(!isEbat(ins));
    3961          56 :                         bat_set_access(ins, BAT_READ);
    3962          56 :                         ok = log_bat_persists(store->logger, ins, -id);
    3963          56 :                         bat_destroy(ins);
    3964             :                 }
    3965      155506 :                 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3966             :         }
    3967             : 
    3968        3377 :         assert(!isTempTable(t));
    3969             : 
    3970        3377 :         if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
    3971        2795 :                 BAT *ui = temp_descriptor(cs->uibid);
    3972        2795 :                 BAT *uv = temp_descriptor(cs->uvbid);
    3973             :                 /* any updates */
    3974        2795 :                 if (ui == NULL || uv == NULL) {
    3975             :                         ok = GDK_FAIL;
    3976        2795 :                 } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
    3977        2795 :                         ok = log_delta(store->logger, ui, uv, id);
    3978        2795 :                 bat_destroy(ui);
    3979        2795 :                 bat_destroy(uv);
    3980             :         }
    3981        2795 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3982             : }
    3983             : 
    3984             : static inline int
    3985       57899 : tr_log_table_start(sql_trans *tr, sql_table *t) {
    3986       57899 :         sqlstore *store = tr->store;
    3987       57899 :         return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3988             : }
    3989             : 
    3990             : static inline int
    3991       57899 : tr_log_table_end(sql_trans *tr, sql_table *t) {
    3992       57899 :         sqlstore *store = tr->store;
    3993       57899 :         return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3994             : }
    3995             : 
    3996             : static int
    3997       57899 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
    3998             : {
    3999       57899 :         sqlstore *store = tr->store;
    4000       57899 :         gdk_return ok = GDK_SUCCEED;
    4001             : 
    4002       57899 :         size_t end = segs_end(segs, tr, t);
    4003             : 
    4004       57899 :         if (tr_log_table_start(tr, t) != LOG_OK)
    4005             :                 return LOG_ERR;
    4006             : 
    4007       57899 :         size_t nr_appends = 0;
    4008             : 
    4009       57899 :         lock_table(tr->store, t->base.id);
    4010      416761 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    4011      358862 :                 unlock_table(tr->store, t->base.id);
    4012             : 
    4013      358862 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    4014      116526 :                         if (!seg->deleted) {
    4015       46722 :                                 if (log_segment(tr, seg, t->base.id) != LOG_OK)
    4016             :                                         return LOG_ERR;
    4017             : 
    4018       46722 :                                 nr_appends += (seg->end - seg->start);
    4019             :                         }
    4020             :                 }
    4021      358862 :                 lock_table(tr->store, t->base.id);
    4022             :         }
    4023       57899 :         unlock_table(tr->store, t->base.id);
    4024             : 
    4025      398675 :         for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
    4026      340776 :                 sql_column *c = n->data;
    4027      340776 :                 column_storage *cs = ATOMIC_PTR_GET(&c->data);
    4028             : 
    4029      340776 :                 if (cs->cleared) {
    4030           3 :                         ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4031           3 :                         continue;
    4032             :                 }
    4033             : 
    4034      340773 :                 lock_table(tr->store, t->base.id);
    4035      340773 :                 if (!cs->cleared) {
    4036     2415590 :                         for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4037     2074817 :                                 unlock_table(tr->store, t->base.id);
    4038     2074817 :                                 if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4039             :                                         /* append col*/
    4040      261769 :                                         BAT *ins = temp_descriptor(cs->bid);
    4041      261769 :                                         if (ins == NULL)
    4042             :                                                 return LOG_ERR;
    4043      261769 :                                         assert(BATcount(ins) >= cur->end);
    4044      261769 :                                         ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
    4045      261769 :                                         bat_destroy(ins);
    4046             :                                 }
    4047     2074817 :                                 lock_table(tr->store, t->base.id);
    4048             :                         }
    4049             :                 }
    4050      340773 :                 unlock_table(tr->store, t->base.id);
    4051             : 
    4052      340773 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    4053          19 :                         BAT *ins = temp_descriptor(cs->ebid);
    4054          19 :                         if (ins == NULL)
    4055             :                                 return LOG_ERR;
    4056          19 :                         if (BATcount(ins) > ins->batInserted)
    4057          17 :                                 ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
    4058          19 :                         BATcommit(ins, BATcount(ins));
    4059          19 :                         bat_destroy(ins);
    4060             :                 }
    4061             :         }
    4062             : 
    4063       57899 :         if (t->idxs) {
    4064       63097 :                 for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
    4065        5198 :                         sql_idx *i = n->data;
    4066             : 
    4067        5198 :                         if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    4068        4420 :                                 continue;
    4069         778 :                         column_storage *cs = ATOMIC_PTR_GET(&i->data);
    4070             : 
    4071         778 :                         if (cs) {
    4072         778 :                                 if (cs->cleared) {
    4073           0 :                                         ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4074           0 :                                         continue;
    4075             :                                 }
    4076             : 
    4077         778 :                                 lock_table(tr->store, t->base.id);
    4078        2387 :                                 for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4079        1609 :                                         unlock_table(tr->store, t->base.id);
    4080        1609 :                                         if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4081             :                                                 /* append idx */
    4082         730 :                                                 BAT *ins = temp_descriptor(cs->bid);
    4083         730 :                                                 if (ins == NULL)
    4084             :                                                         return LOG_ERR;
    4085         730 :                                                 assert(BATcount(ins) >= cur->end);
    4086         730 :                                                 ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
    4087         730 :                                                 bat_destroy(ins);
    4088             :                                         }
    4089        1609 :                                         lock_table(tr->store, t->base.id);
    4090             :                                 }
    4091         778 :                                 unlock_table(tr->store, t->base.id);
    4092             :                         }
    4093             :                 }
    4094             :         }
    4095             : 
    4096       57899 :         if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
    4097           0 :                 return LOG_ERR;
    4098             : 
    4099             :         return LOG_OK;
    4100             : }
    4101             : 
    4102             : static int
    4103       83870 : log_storage(sql_trans *tr, sql_table *t, storage *s)
    4104             : {
    4105       83870 :         int ok = LOG_OK;
    4106       83870 :         bool cleared = s->cs.cleared;
    4107       83870 :         if (ok == LOG_OK && cleared)
    4108       25971 :                 ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
    4109       25971 :         if (ok == LOG_OK)
    4110       83870 :                 ok = log_segments(tr, s->segs, t->base.id);
    4111       83870 :         if (ok == LOG_OK && !cleared)
    4112       57899 :                 ok = log_table_append(tr, t, s->segs);
    4113       83870 :         return ok;
    4114             : }
    4115             : 
    4116             : static void
    4117      302063 : merge_cs( column_storage *cs, const char* caller)
    4118             : {
    4119      302063 :         if (cs->bid && cs->ucnt) {
    4120        2803 :                 BAT *cur = temp_descriptor(cs->bid);
    4121        2803 :                 BAT *ui = temp_descriptor(cs->uibid);
    4122        2803 :                 BAT *uv = temp_descriptor(cs->uvbid);
    4123             : 
    4124        2803 :                 if (!cur || !ui || !uv) {
    4125           0 :                         bat_destroy(ui);
    4126           0 :                         bat_destroy(uv);
    4127           0 :                         bat_destroy(cur);
    4128           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4129             :                         return;
    4130             :                 }
    4131        2803 :                 assert(BATcount(ui) == BATcount(uv));
    4132             : 
    4133             :                 /* any updates */
    4134        2803 :                 assert(!isEbat(cur));
    4135        2803 :                 if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
    4136           0 :                         bat_destroy(ui);
    4137           0 :                         bat_destroy(uv);
    4138           0 :                         bat_destroy(cur);
    4139           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4140             :                         return;
    4141             :                 }
    4142             :                 /* cleanup the old deltas */
    4143        2803 :                 temp_destroy(cs->uibid);
    4144        2803 :                 temp_destroy(cs->uvbid);
    4145        2803 :                 cs->uibid = e_bat(TYPE_oid);
    4146        2803 :                 cs->uvbid = e_bat(cur->ttype);
    4147        2803 :                 assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
    4148        2803 :                 cs->ucnt = 0;
    4149        2803 :                 bat_destroy(ui);
    4150        2803 :                 bat_destroy(uv);
    4151        2803 :                 bat_destroy(cur);
    4152             :         }
    4153      302063 :         cs->cleared = false;
    4154      302063 :         cs->merged = true;
    4155      302063 :         return;
    4156             : }
    4157             : 
    4158             : static void
    4159      260603 : merge_delta( sql_delta *obat)
    4160             : {
    4161      260603 :         if (obat && obat->next && !obat->cs.merged)
    4162       51927 :                 merge_delta(obat->next);
    4163      260603 :         merge_cs(&obat->cs, __func__);
    4164      260603 : }
    4165             : 
    4166             : static void
    4167       41460 : merge_storage(storage *tdb)
    4168             : {
    4169       41460 :         merge_cs(&tdb->cs, __func__);
    4170             : 
    4171       41460 :         if (tdb->next) {
    4172         420 :                 destroy_storage(tdb->next);
    4173         420 :                 tdb->next = NULL;
    4174             :         }
    4175       41460 : }
    4176             : 
    4177             : static sql_delta *
    4178           1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
    4179             : {
    4180             :         /* commit ie copy back to the parent transaction */
    4181           1 :         if (delta && delta->cs.ts == commit_ts && delta->next) {
    4182           1 :                 sql_delta *od = delta->next;
    4183           1 :                 if (od->cs.ts == commit_ts) {
    4184           0 :                         sql_delta t = *od, *n = od->next;
    4185           0 :                         *od = *delta;
    4186           0 :                         od->next = n;
    4187           0 :                         *delta = t;
    4188           0 :                         delta->next = NULL;
    4189           0 :                         destroy_delta(delta, true);
    4190           0 :                         return od;
    4191             :                 }
    4192             :         }
    4193             :         return delta;
    4194             : }
    4195             : 
    4196             : static int
    4197      132890 : log_update_col( sql_trans *tr, sql_change *change)
    4198             : {
    4199      132890 :         sql_column *c = (sql_column*)change->obj;
    4200      132890 :         assert(!isTempTable(c->t));
    4201             : 
    4202      132890 :         if (isDeleted(c->t)) {
    4203           0 :                 change->handled = true;
    4204           0 :                 return LOG_OK;
    4205             :         }
    4206             : 
    4207      132890 :         if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
    4208      132890 :                 storage *s = ATOMIC_PTR_GET(&c->t->data);
    4209      132890 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    4210      132890 :                 return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
    4211             :         }
    4212             :         return LOG_OK;
    4213             : }
    4214             : 
    4215             : static int
    4216         217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
    4217             : {
    4218         217 :         sqlstore *store = Store;
    4219             : 
    4220         217 :         sql_delta *d = (sql_delta*)change->data;
    4221         217 :         if (d->cs.ts < oldest) {
    4222          82 :                 destroy_delta(d, false);
    4223          82 :                 if (change->commit == &commit_update_idx)
    4224           2 :                         table_destroy(store, ((sql_idx*)change->obj)->t);
    4225             :                 else
    4226          80 :                         table_destroy(store, ((sql_column*)change->obj)->t);
    4227          82 :                 return 1;
    4228             :         }
    4229         135 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4230          82 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4231             :         return 0;
    4232             : }
    4233             : 
    4234             : static int
    4235           9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
    4236             : {
    4237           9 :         sqlstore *store = Store;
    4238             : 
    4239           9 :         storage *d = (storage*)change->data;
    4240           9 :         if (d->cs.ts < oldest) {
    4241           3 :                 destroy_storage(d);
    4242           3 :                 table_destroy(store, (sql_table*)change->obj);
    4243           3 :                 return 1;
    4244             :         }
    4245           6 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4246           3 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4247             :         return 0;
    4248             : }
    4249             : 
    4250             : static int
    4251      133008 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
    4252             : {
    4253      133008 :         (void) type; // TODO transaction_layer_revamp remove if remains unused
    4254             : 
    4255      133008 :         sql_delta *delta = ATOMIC_PTR_GET(data);
    4256             : 
    4257      133008 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4258           3 :                 int ok = LOG_OK;
    4259           3 :                 assert(isTempTable(t));
    4260           3 :                 if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
    4261           0 :                         ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
    4262           3 :                 if (!tr->parent)
    4263           0 :                         t->base.new = base->new = 0;
    4264           3 :                 change->handled = true;
    4265           3 :                 return ok;
    4266             :         }
    4267             : 
    4268      133005 :         if (commit_ts)
    4269      132923 :                 delta->cs.ts = commit_ts;
    4270      133005 :         if (!commit_ts) { /* rollback */
    4271          82 :                 sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
    4272             : 
    4273          82 :                 if (change->ts && t->base.new) /* handled by create col */
    4274             :                         return LOG_OK;
    4275          82 :                 if (o != d) {
    4276           0 :                         while(o && o->next != d)
    4277             :                                 o = o->next;
    4278             :                 }
    4279          82 :                 if (o == ATOMIC_PTR_GET(data))
    4280          82 :                         ATOMIC_PTR_SET(data, d->next);
    4281             :                 else
    4282           0 :                         o->next = d->next;
    4283          82 :                 d->next = NULL;
    4284          82 :                 change->cleanup = &tc_gc_rollbacked;
    4285      132923 :         } else if (!tr->parent) {
    4286             :                 /* merge deltas */
    4287      359761 :                 while (delta && delta->cs.ts > oldest)
    4288      226839 :                         delta = delta->next;
    4289      132922 :                 if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
    4290       31527 :                         lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
    4291       31527 :                         merge_delta(delta);
    4292       31527 :                         unlock_column(tr->store, base->id);
    4293             :                 }
    4294           1 :         } else if (tr->parent) /* move delta into older and cleanup current save points */
    4295           1 :                 ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
    4296             :         return LOG_OK;
    4297             : }
    4298             : 
    4299             : static int
    4300      132980 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4301             : {
    4302             : 
    4303      132980 :         sql_column *c = (sql_column*)change->obj;
    4304      132980 :         sql_base* base = &c->base;
    4305      132980 :         sql_table* t = c->t;
    4306      132980 :         ATOMIC_PTR_TYPE* data = &c->data;
    4307      132980 :         int type = c->type.type->localtype;
    4308             : 
    4309      132980 :         if (change->handled || isDeleted(c->t))
    4310             :                 return LOG_OK;
    4311             : 
    4312      132980 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4313             : }
    4314             : 
    4315             : static int
    4316          26 : log_update_idx( sql_trans *tr, sql_change *change)
    4317             : {
    4318          26 :         sql_idx *i = (sql_idx*)change->obj;
    4319          26 :         assert(!isTempTable(i->t));
    4320             : 
    4321          26 :         if (isDeleted(i->t)) {
    4322           0 :                 change->handled = true;
    4323           0 :                 return LOG_OK;
    4324             :         }
    4325             : 
    4326          26 :         if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
    4327          26 :                 storage *s = ATOMIC_PTR_GET(&i->t->data);
    4328          26 :                 sql_delta *d = ATOMIC_PTR_GET(&i->data);
    4329          26 :                 return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
    4330             :         }
    4331             :         return LOG_OK;
    4332             : }
    4333             : 
    4334             : static int
    4335          28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4336             : {
    4337          28 :         sql_idx *i = (sql_idx*)change->obj;
    4338          28 :         sql_base* base = &i->base;
    4339          28 :         sql_table* t = i->t;
    4340          28 :         ATOMIC_PTR_TYPE* data = &i->data;
    4341          28 :         int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
    4342             : 
    4343          28 :         if (change->handled || isDeleted(i->t))
    4344             :                 return LOG_OK;
    4345             : 
    4346          28 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4347             : }
    4348             : 
    4349             : static storage *
    4350          26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
    4351             : {
    4352          26 :         if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
    4353           0 :                 storage *od = dbat->next;
    4354           0 :                 if (od->cs.ts == commit_ts) {
    4355           0 :                         storage t = *od, *n = od->next;
    4356           0 :                         *od = *dbat;
    4357           0 :                         od->next = n;
    4358           0 :                         *dbat = t;
    4359           0 :                         dbat->next = NULL;
    4360           0 :                         destroy_storage(dbat);
    4361           0 :                         return od;
    4362             :                 }
    4363             :         }
    4364             :         return dbat;
    4365             : }
    4366             : 
    4367             : static int
    4368       83870 : log_update_del( sql_trans *tr, sql_change *change)
    4369             : {
    4370       83870 :         sql_table *t = (sql_table*)change->obj;
    4371       83870 :         assert(!isTempTable(t));
    4372             : 
    4373       83870 :         if (isDeleted(t)) {
    4374           0 :                 change->handled = true;
    4375           0 :                 return LOG_OK;
    4376             :         }
    4377             : 
    4378       83870 :         if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
    4379       83870 :                 return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
    4380             :         return LOG_OK;
    4381             : }
    4382             : 
    4383             : static int
    4384       88386 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4385             : {
    4386       88386 :         int ok = LOG_OK;
    4387       88386 :         sql_table *t = (sql_table*)change->obj;
    4388       88386 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    4389             : 
    4390       88386 :         if (change->handled || isDeleted(t))
    4391             :                 return ok;
    4392             : 
    4393       88386 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4394          22 :                 assert(isTempTable(t));
    4395          22 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    4396          22 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    4397          22 :                 change->handled = true;
    4398          22 :                 return ok;
    4399             :         }
    4400             : 
    4401       88364 :         lock_table(tr->store, t->base.id);
    4402       88364 :         if (!commit_ts) { /* rollback */
    4403        4194 :                 if (dbat->cs.ts == tr->tid) {
    4404           6 :                         if (change->ts && t->base.new) { /* handled by the create table */
    4405           3 :                                 unlock_table(tr->store, t->base.id);
    4406           3 :                                 return ok;
    4407             :                         }
    4408           3 :                         storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
    4409             : 
    4410           3 :                         if (o != d) {
    4411           0 :                                 while(o && o->next != d)
    4412             :                                         o = o->next;
    4413             :                         }
    4414           3 :                         if (o == ATOMIC_PTR_GET(&t->data)) {
    4415           3 :                                 assert(d->next);
    4416           3 :                                 ATOMIC_PTR_SET(&t->data, d->next);
    4417             :                         } else
    4418           0 :                                 o->next = d->next;
    4419           3 :                         d->next = NULL;
    4420           3 :                         change->cleanup = &tc_gc_rollbacked_storage;
    4421             :                 } else
    4422        4188 :                         rollback_segments(dbat->segs, tr, change, oldest);
    4423       84170 :         } else if (ok == LOG_OK && !tr->parent) {
    4424       84144 :                 if (dbat->cs.ts == tr->tid) /* cleared table */
    4425       25973 :                         dbat->cs.ts = commit_ts;
    4426             : 
    4427       84144 :                 ok = segments2cs(tr, dbat->segs, &dbat->cs);
    4428       84144 :                 if (ok == LOG_OK) {
    4429       84144 :                         merge_segments(dbat, tr, change, commit_ts, oldest);
    4430       84144 :                         if (oldest == commit_ts)
    4431       41460 :                                 merge_storage(dbat);
    4432             :                 }
    4433       84144 :                 if (dbat)
    4434       84144 :                         dbat->cs.cleared = false;
    4435          26 :         } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
    4436          26 :                 merge_segments(dbat, tr, change, commit_ts, oldest);
    4437          26 :                 ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
    4438          26 :                 storage *s = change->data;
    4439          26 :                 if (s->cs.ts == tr->tid)
    4440           0 :                         s->cs.ts = commit_ts;
    4441             :         }
    4442       88361 :         unlock_table(tr->store, t->base.id);
    4443       88361 :         return ok;
    4444             : }
    4445             : 
    4446             : /* only rollback (content version) case for now */
    4447             : static int
    4448         215 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
    4449             : {
    4450         215 :         sqlstore *store = Store;
    4451         215 :         sql_column *c = (sql_column*)change->obj;
    4452             : 
    4453         215 :         if (!c) /* cleaned earlier */
    4454             :                 return 1;
    4455             : 
    4456         160 :         if (change->handled || isDeleted(c->t)) {
    4457           0 :                 column_destroy(store, c);
    4458           0 :                 return 1;
    4459             :         }
    4460             : 
    4461             :         /* savepoint commit (did it merge ?) */
    4462         160 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4463           0 :                 column_destroy(store, c);
    4464           0 :                 return 1;
    4465             :         }
    4466         160 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4467             :                 return 0;
    4468         159 :         sql_delta *d = (sql_delta*)change->data;
    4469         159 :         if (d && d->next) {
    4470             : 
    4471          62 :                 if (d->cs.ts > oldest)
    4472             :                         return LOG_OK; /* cannot cleanup yet */
    4473             : 
    4474             :                 // d is oldest reachable delta
    4475          59 :                 if (d->next) // Unreachable can immediately be destroyed.
    4476          59 :                         destroy_delta(d->next, true);
    4477             : 
    4478          59 :                 d->next = NULL;
    4479          59 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4480          59 :                 merge_delta(d);
    4481          59 :                 unlock_column(store, c->base.id);
    4482             :         }
    4483         156 :         column_destroy(store, c);
    4484         156 :         return 1;
    4485             : }
    4486             : 
    4487             : static int
    4488      874388 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
    4489             : {
    4490      874388 :         sqlstore *store = Store;
    4491      874388 :         sql_column *c = (sql_column*)change->obj;
    4492             : 
    4493      874388 :         if (!c) /* cleaned earlier */
    4494             :                 return 1;
    4495             : 
    4496      874388 :         if (change->handled || isDeleted(c->t)) {
    4497           3 :                 table_destroy(store, c->t);
    4498           3 :                 return 1;
    4499             :         }
    4500             : 
    4501             :         /* savepoint commit (did it merge ?) */
    4502      874385 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4503       71883 :                 table_destroy(store, c->t);
    4504       71883 :                 return 1;
    4505             :         }
    4506      802502 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4507             :                 return 0;
    4508      802501 :         sql_delta *d = (sql_delta*)change->data;
    4509      802501 :         if (d && d->next) {
    4510             : 
    4511      802501 :                 if (d->cs.ts > oldest)
    4512             :                         return LOG_OK; /* cannot cleanup yet */
    4513             : 
    4514             :                 // d is oldest reachable delta
    4515       60954 :                 if (d->next) // Unreachable can immediately be destroyed.
    4516       60954 :                         destroy_delta(d->next, true);
    4517             : 
    4518       60954 :                 d->next = NULL;
    4519       60954 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4520       60954 :                 merge_delta(d);
    4521       60954 :                 unlock_column(store, c->base.id);
    4522             :         }
    4523       60954 :         table_destroy(store, c->t);
    4524       60954 :         return 1;
    4525             : }
    4526             : 
    4527             : static int
    4528        1133 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
    4529             : {
    4530        1133 :         sqlstore *store = Store;
    4531        1133 :         sql_idx *i = (sql_idx*)change->obj;
    4532             : 
    4533        1133 :         if (!i) /* cleaned earlier */
    4534             :                 return 1;
    4535             : 
    4536         632 :         if (change->handled || isDeleted(i->t)) {
    4537           0 :                 idx_destroy(store, i);
    4538           0 :                 return 1;
    4539             :         }
    4540             : 
    4541             :         /* savepoint commit (did it merge ?) */
    4542         632 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4543           0 :                 idx_destroy(store, i);
    4544           0 :                 return 1;
    4545             :         }
    4546         632 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4547             :                 return 0;
    4548         632 :         sql_delta *d = (sql_delta*)change->data;
    4549         632 :         if (d->next) {
    4550             : 
    4551           0 :                 if (d->cs.ts > oldest)
    4552             :                         return LOG_OK; /* cannot cleanup yet */
    4553             : 
    4554             :                 // d is oldest reachable delta
    4555           0 :                 if (d->next) // Unreachable can immediately be destroyed.
    4556           0 :                         destroy_delta(d->next, true);
    4557             : 
    4558           0 :                 d->next = NULL;
    4559           0 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4560           0 :                 merge_delta(d);
    4561           0 :                 unlock_column(store, i->base.id);
    4562             :         }
    4563         632 :         idx_destroy(store, i);
    4564         632 :         return 1;
    4565             : }
    4566             : 
    4567             : static int
    4568          26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
    4569             : {
    4570          26 :         sqlstore *store = Store;
    4571          26 :         sql_idx *i = (sql_idx*)change->obj;
    4572             : 
    4573          26 :         if (!i) /* cleaned earlier */
    4574             :                 return 1;
    4575             : 
    4576          26 :         if (change->handled || isDeleted(i->t)) {
    4577           0 :                 table_destroy(store, i->t);
    4578           0 :                 return 1;
    4579             :         }
    4580             : 
    4581             :         /* savepoint commit (did it merge ?) */
    4582          26 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4583           0 :                 table_destroy(store, i->t);
    4584           0 :                 return 1;
    4585             :         }
    4586          26 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4587             :                 return 0;
    4588          26 :         sql_delta *d = (sql_delta*)change->data;
    4589          26 :         if (d->next) {
    4590             : 
    4591          26 :                 if (d->cs.ts > oldest)
    4592             :                         return LOG_OK; /* cannot cleanup yet */
    4593             : 
    4594             :                 // d is oldest reachable delta
    4595          26 :                 if (d->next) // Unreachable can immediately be destroyed.
    4596          26 :                         destroy_delta(d->next, true);
    4597             : 
    4598          26 :                 d->next = NULL;
    4599          26 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4600          26 :                 merge_delta(d);
    4601          26 :                 unlock_column(store, i->base.id);
    4602             :         }
    4603          26 :         table_destroy(store, i->t);
    4604          26 :         return 1;
    4605             : }
    4606             : 
    4607             : static int
    4608      247816 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
    4609             : {
    4610      247816 :         sqlstore *store = Store;
    4611      247816 :         sql_table *t = (sql_table*)change->obj;
    4612             : 
    4613      247816 :         if (change->handled || isDeleted(t)) {
    4614        3404 :                 table_destroy(store, t);
    4615        3404 :                 return 1;
    4616             :         }
    4617             :         /* savepoint commit (did it merge ?) */
    4618      244412 :         if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
    4619       14396 :                 table_destroy(store, t);
    4620       14396 :                 return 1;
    4621             :         }
    4622      230016 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4623             :                 return 0;
    4624      229988 :         storage *d = (storage*)change->data;
    4625      229988 :         if (d->next) {
    4626      158812 :                 if (d->cs.ts > oldest)
    4627             :                         return LOG_OK; /* cannot cleanup yet */
    4628             : 
    4629       11158 :                 destroy_storage(d->next);
    4630       11158 :                 d->next = NULL;
    4631             :         }
    4632       82334 :         table_destroy(store, t);
    4633       82334 :         return 1;
    4634             : }
    4635             : 
    4636             : static int
    4637       30397 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
    4638             : {
    4639       30397 :         if (nr == 0)
    4640             :                 return LOG_OK;
    4641       30397 :         assert (nr > 0);
    4642       30397 :         if ((!offsets || !*offsets) && nr == total) {
    4643       30368 :                 *offset = slot;
    4644       30368 :                 return LOG_OK;
    4645             :         }
    4646          29 :         if (!*offsets) {
    4647           7 :                 *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
    4648           7 :                 if (!*offsets)
    4649             :                         return LOG_ERR;
    4650             :         }
    4651          29 :         oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
    4652       16168 :         for(size_t i = 0; i < nr; i++)
    4653       16139 :                 dst[i] = slot + i;
    4654          29 :         (*offsets)->batCount += nr;
    4655          29 :         (*offsets)->theap->dirty = true;
    4656          29 :         return LOG_OK;
    4657             : }
    4658             : 
    4659             : static int
    4660       30375 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4661             : {
    4662       30375 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4663       30375 :         assert(s->segs);
    4664       30375 :         ulng oldest = store_oldest(tr->store, NULL);
    4665       30375 :         BUN slot = 0;
    4666       30375 :         size_t total = cnt;
    4667             : 
    4668       30375 :         if (!locked)
    4669       30375 :                 lock_table(tr->store, t->base.id);
    4670             :         /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
    4671       60882 :         for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4672       30509 :                 if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* re-use old deleted or rolledback append */
    4673          35 :                         if ((seg->end - seg->start) >= cnt) {
    4674             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4675          13 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4676           2 :                                         slot = p->end;
    4677           2 :                                         p->end += cnt;
    4678           2 :                                         seg->start += cnt;
    4679           2 :                                         if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
    4680             :                                                 ok = LOG_ERR;
    4681             :                                                 break;
    4682             :                                         }
    4683           2 :                                         cnt = 0;
    4684           2 :                                         break;
    4685             :                                 }
    4686             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4687          11 :                                 size_t rcnt = seg->end - seg->start;
    4688          11 :                                 if (rcnt > cnt)
    4689             :                                         rcnt = cnt;
    4690          11 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
    4691             :                                         ok = LOG_ERR;
    4692             :                                         break;
    4693             :                                 }
    4694             :                         }
    4695          33 :                         seg->ts = tr->tid;
    4696          33 :                         seg->deleted = false;
    4697          33 :                         slot = seg->start;
    4698          33 :                         if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
    4699             :                                 ok = LOG_ERR;
    4700             :                                 break;
    4701             :                         }
    4702          33 :                         cnt -= (seg->end - seg->start);
    4703             :                 }
    4704             :         }
    4705       30375 :         if (ok == LOG_OK && cnt) {
    4706       30362 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4707       29247 :                         slot = s->segs->t->end;
    4708       29247 :                         s->segs->t->end += cnt;
    4709             :                 } else {
    4710        1115 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4711             :                                 ok = LOG_ERR;
    4712             :                         } else {
    4713        1115 :                                 if (!s->segs->h)
    4714           0 :                                         s->segs->h = s->segs->t;
    4715        1115 :                                 slot = s->segs->t->start;
    4716             :                         }
    4717             :                 }
    4718       30362 :                 if (ok == LOG_OK)
    4719       30362 :                         ok = add_offsets(slot, cnt, total, offset, offsets);
    4720             :         }
    4721       30375 :         if (!locked)
    4722       30375 :                 unlock_table(tr->store, t->base.id);
    4723             : 
    4724       30375 :         if (ok == LOG_OK) {
    4725             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4726       30375 :                 if (!in_transaction) {
    4727        1100 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4728        1100 :                         in_transaction = true;
    4729             :                 }
    4730       30375 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4731       30361 :                         tr->logchanges += (int) total;
    4732       30375 :                 if (*offsets) {
    4733           7 :                         BAT *pos = *offsets;
    4734           7 :                         assert(BATcount(pos) == total);
    4735           7 :                         BATsetcount(pos, total); /* set other properties */
    4736           7 :                         pos->tnil = false;
    4737           7 :                         pos->tnonil = true;
    4738           7 :                         pos->tkey = true;
    4739           7 :                         pos->tsorted = true;
    4740           7 :                         pos->trevsorted = false;
    4741             :                 }
    4742             :         }
    4743       30375 :         return ok;
    4744             : }
    4745             : 
    4746             : static int
    4747     2053094 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4748             : {
    4749     2053094 :         if (cnt > 1 && offsets)
    4750       30375 :                 return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
    4751     2022719 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4752     2022719 :         assert(s->segs);
    4753     2022719 :         ulng oldest = store_oldest(tr->store, NULL);
    4754     2022719 :         BUN slot = 0;
    4755     2022719 :         int reused = 0;
    4756             : 
    4757     2022719 :         if (!locked)
    4758     1901661 :                 lock_table(tr->store, t->base.id);
    4759             :         /* naive vacuum approach, iterator through segments, check for large enough deleted segments
    4760             :          * or create new segment at the end */
    4761     8519879 :         for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4762     6571155 :                 if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* re-use old deleted or rolledback append */
    4763             : 
    4764       73995 :                         if ((seg->end - seg->start) >= cnt) {
    4765             : 
    4766             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4767       73995 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4768       58513 :                                         slot = p->end;
    4769       58513 :                                         p->end += cnt;
    4770       58513 :                                         seg->start += cnt;
    4771       58513 :                                         reused = 1;
    4772       58513 :                                         break;
    4773             :                                 }
    4774             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4775       15482 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
    4776             :                                         ok = LOG_ERR;
    4777             :                                         break;
    4778             :                                 }
    4779             :                         }
    4780       15482 :                         seg->ts = tr->tid;
    4781       15482 :                         seg->deleted = false;
    4782       15482 :                         slot = seg->start;
    4783       15482 :                         reused = 1;
    4784       15482 :                         break;
    4785             :                 }
    4786             :         }
    4787     2022719 :         if (ok == LOG_OK && !reused) {
    4788     1948724 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4789     1913775 :                         slot = s->segs->t->end;
    4790     1913775 :                         s->segs->t->end += cnt;
    4791             :                 } else {
    4792       34949 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4793             :                                 ok = LOG_ERR;
    4794             :                         } else {
    4795       34949 :                                 if (!s->segs->h)
    4796           0 :                                         s->segs->h = s->segs->t;
    4797       34949 :                                 slot = s->segs->t->start;
    4798             :                         }
    4799             :                 }
    4800             :         }
    4801     2022719 :         if (!locked)
    4802     1901661 :                 unlock_table(tr->store, t->base.id);
    4803             : 
    4804     2022719 :         if (ok == LOG_OK) {
    4805             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4806     2022719 :                 if (!in_transaction) {
    4807       48553 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4808       48553 :                         in_transaction = true;
    4809             :                 }
    4810     2022719 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4811     2022292 :                         tr->logchanges += (int) cnt;
    4812     2022719 :                 *offset = slot;
    4813             :         }
    4814             :         return ok;
    4815             : }
    4816             : 
    4817             : /*
    4818             :  * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
    4819             :  * of the global transaction and mark the newly added storage slots unused on the global
    4820             :  * level but used on the local transaction level. Besides this the local transaction needs
    4821             :  * to update (and mark unused) any slot inbetween the old end and new slots.
    4822             :  * */
    4823             : static int
    4824     1932036 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4825             : {
    4826     1932036 :         storage *s;
    4827             : 
    4828             :         /* we have a single segment structure for each persistent table
    4829             :          * for temporary tables each has its own */
    4830     1932036 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4831             :                 return LOG_ERR;
    4832             : 
    4833     1932036 :         return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
    4834             : }
    4835             : 
    4836             : /* some tables cannot be updated concurrently (user/roles etc) */
    4837             : static int
    4838      121062 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4839             : {
    4840      121062 :         storage *s;
    4841      121062 :         int res = 0;
    4842             : 
    4843             :         /* we have a single segment structure for each persistent table
    4844             :          * for temporary tables each has its own */
    4845      121062 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4846             :                 /* TODO check for other inserts ! */
    4847             :                 return LOG_ERR;
    4848             : 
    4849      121062 :         lock_table(tr->store, t->base.id);
    4850      121062 :         if ((res = segments_conflict(tr, s->segs, 1))) {
    4851           4 :                 unlock_table(tr->store, t->base.id);
    4852           4 :                 return LOG_CONFLICT;
    4853             :         }
    4854      121058 :         res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
    4855      121058 :         unlock_table(tr->store, t->base.id);
    4856      121058 :         return res;
    4857             : }
    4858             : 
    4859             : static int
    4860       14767 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
    4861             : {
    4862       14767 :         storage *s;
    4863       14767 :         int res = 0;
    4864             : 
    4865       14767 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4866             :                 return LOG_ERR;
    4867             : 
    4868       14767 :         lock_table(tr->store, t->base.id);
    4869       14767 :         res = segments_conflict(tr, s->segs, uncommitted);
    4870       14767 :         unlock_table(tr->store, t->base.id);
    4871       14767 :         return res ? LOG_CONFLICT : LOG_OK;
    4872             : }
    4873             : 
    4874             : static size_t
    4875     1353368 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
    4876             : {
    4877     1353368 :         size_t cnt = 0;
    4878             : 
    4879     1434552 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
    4880             :                 ;
    4881             : 
    4882     3422360 :         for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
    4883     2068991 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
    4884      121307 :                         cnt += s->end - s->start;
    4885             :         }
    4886     1353369 :         return cnt;
    4887             : }
    4888             : 
    4889             : static BAT *
    4890     1353386 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
    4891             : {
    4892     1353386 :         lock_table(tr->store, t->base.id);
    4893     1353369 :         segment *s = S->segs->h;
    4894             :         /* step one no deletes -> dense range */
    4895     1353369 :         uint32_t cur = 0;
    4896     1353369 :         size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
    4897     1353369 :         if (!dnr) {
    4898     1247033 :                 unlock_table(tr->store, t->base.id);
    4899     1247062 :                 return BATdense(start, start, end-start);
    4900             :         }
    4901             : 
    4902      106336 :         BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
    4903      106336 :         if (!b) {
    4904           0 :                 unlock_table(tr->store, t->base.id);
    4905           0 :                 return NULL;
    4906             :         }
    4907             : 
    4908      106336 :         uint32_t *restrict dst = Tloc(b, 0);
    4909    59483111 :         for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
    4910    59434042 :                 if (s->end < start)
    4911       43398 :                         continue;
    4912    59390644 :                 if (s->start >= end)
    4913             :                         break;
    4914    59333377 :                 msk m = (SEG_IS_VALID(s, tr));
    4915    59333377 :                 size_t lnr = s->end-s->start;
    4916    59333377 :                 if (s->start < start)
    4917        6933 :                         lnr -= (start - s->start);
    4918    59333377 :                 if (s->end > end)
    4919        5091 :                         lnr -= s->end - end;
    4920             : 
    4921    59333377 :                 if (m) {
    4922      992246 :                         size_t used = pos&31, end = 32;
    4923      992246 :                         if (used) {
    4924      861824 :                                 if (lnr < (32-used))
    4925      501348 :                                         end = used + lnr;
    4926      861824 :                                 assert(end > used);
    4927      861824 :                                 cur |= ((1U << (end - used)) - 1) << used;
    4928      861824 :                                 lnr -= end - used;
    4929      861824 :                                 pos += end - used;
    4930      861824 :                                 if (end == 32) {
    4931      360476 :                                         *dst++ = cur;
    4932      360476 :                                         cur = 0;
    4933             :                                 }
    4934             :                         }
    4935      992246 :                         size_t full = lnr/32;
    4936      992246 :                         size_t rest = lnr%32;
    4937      992246 :                         if (full > 0) {
    4938      279420 :                                 memset(dst, ~0, full * sizeof(*dst));
    4939      279420 :                                 dst += full;
    4940      279420 :                                 lnr -= full * 32;
    4941      279420 :                                 pos += full * 32;
    4942             :                         }
    4943      992246 :                         if (rest > 0) {
    4944      460075 :                                 cur |= (1U << rest) - 1;
    4945      460075 :                                 lnr -= rest;
    4946      460075 :                                 pos += rest;
    4947             :                         }
    4948      992246 :                         assert(lnr==0);
    4949             :                 } else {
    4950    58341131 :                         size_t used = pos&31, end = 32;
    4951    58341131 :                         if (used) {
    4952    56506046 :                                 if (lnr < (32-used))
    4953    54589916 :                                         end = used + lnr;
    4954             : 
    4955    56506046 :                                 pos+= (end-used);
    4956    56506046 :                                 lnr-= (end-used);
    4957    56506046 :                                 if (end == 32) {
    4958     1916130 :                                         *dst++ = cur;
    4959     1916130 :                                         cur = 0;
    4960             :                                 }
    4961             :                         }
    4962    58341131 :                         size_t full = lnr/32;
    4963    58341131 :                         size_t rest = lnr%32;
    4964    58341131 :                         memset(dst, 0, full * sizeof(*dst));
    4965    58341131 :                         dst += full;
    4966    58341131 :                         lnr -= full * 32;
    4967    58341131 :                         pos += full * 32;
    4968    58341131 :                         pos+= rest;
    4969    58341131 :                         lnr-= rest;
    4970    58341131 :                         assert(lnr==0);
    4971             :                 }
    4972             :         }
    4973             : 
    4974      106336 :         unlock_table(tr->store, t->base.id);
    4975      106336 :         if (pos%32)
    4976      101916 :                 *dst=cur;
    4977      106336 :         BATsetcount(b, nr);
    4978      106335 :         bn = BATmaskedcands(start, nr, b, true);
    4979      106336 :         BBPreclaim(b);
    4980      106336 :         (void)pos;
    4981      106336 :         assert (pos == nr);
    4982             :         return bn;
    4983             : }
    4984             : 
    4985             : static void *                                   /* BAT * */
    4986     1360645 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
    4987             : {
    4988             :         /* with nr_of_parts - part_nr we can adjust parts */
    4989     1360645 :         storage *s = tab_timestamp_storage(tr, t);
    4990             : 
    4991     1360634 :         if (!s)
    4992             :                 return NULL;
    4993     1360634 :         size_t nr = segs_end(s->segs, tr, t);
    4994             : 
    4995     1360623 :         if (!nr)
    4996        7237 :                 return BATdense(0, 0, 0);
    4997             : 
    4998             :         /* compute proper part */
    4999     1353386 :         size_t part_size = nr/nr_of_parts;
    5000     1353386 :         size_t start = part_size * part_nr;
    5001     1353386 :         size_t end = start + part_size;
    5002     1353386 :         if (part_nr == (nr_of_parts-1))
    5003     1283777 :                 end = nr;
    5004     1353386 :         assert(end <= nr);
    5005     1353386 :         return segments2cands(s, tr, t, start, end);
    5006             : }
    5007             : 
    5008             : static int
    5009           1 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
    5010             : {
    5011           1 :         bool update_conflict = false;
    5012             : 
    5013           1 :         if (segments_in_transaction(tr, col->t))
    5014             :                 return LOG_CONFLICT;
    5015             : 
    5016           1 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5017             : 
    5018           1 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5019           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5020           1 :         assert(d && d->cs.ts == tr->tid);
    5021           1 :         if (odelta != d)
    5022           1 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
    5023           1 :         if (d->cs.bid)
    5024           1 :                 temp_destroy(d->cs.bid);
    5025           1 :         if (d->cs.uibid)
    5026           1 :                 temp_destroy(d->cs.uibid);
    5027           1 :         if (d->cs.uvbid)
    5028           1 :                 temp_destroy(d->cs.uvbid);
    5029           1 :         bat_set_access(bn, BAT_READ);
    5030           1 :         d->cs.bid = temp_create(bn);
    5031           1 :         d->cs.uibid = 0;
    5032           1 :         d->cs.uvbid = 0;
    5033           1 :         d->cs.ucnt = 0;
    5034           1 :         d->cs.cleared = true;
    5035           1 :         d->cs.ts = tr->tid;
    5036           1 :         ATOMIC_INIT(&d->cs.refcnt, 1);
    5037           1 :         return LOG_OK;
    5038             : }
    5039             : 
    5040             : static int
    5041          58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
    5042             : {
    5043          58 :         bool update_conflict = false;
    5044             : 
    5045          58 :         if (segments_in_transaction(tr, col->t))
    5046             :                 return LOG_CONFLICT;
    5047             : 
    5048          58 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5049             : 
    5050          58 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5051           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5052          58 :         assert(d && d->cs.ts == tr->tid);
    5053          58 :         assert(col->t->persistence != SQL_DECLARED_TABLE);
    5054          58 :         if (odelta != d)
    5055          58 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
    5056             : 
    5057          58 :         d->cs.st = st;
    5058          58 :         d->cs.cleared = true;
    5059          58 :         if (d->cs.bid)
    5060          58 :                 temp_destroy(d->cs.bid);
    5061          58 :         o = transfer_to_systrans(o);
    5062          58 :         if (o == NULL)
    5063             :                 return LOG_ERR;
    5064          58 :         bat_set_access(o, BAT_READ);
    5065          58 :         d->cs.bid = temp_create(o);
    5066          58 :         if (u) {
    5067          53 :                 if (d->cs.ebid)
    5068           0 :                         temp_destroy(d->cs.ebid);
    5069          53 :                 u = transfer_to_systrans(u);
    5070          53 :                 if (u == NULL)
    5071             :                         return LOG_ERR;
    5072          53 :                 d->cs.ebid = temp_create(u);
    5073             :         }
    5074             :         return LOG_OK;
    5075             : }
    5076             : 
    5077             : void
    5078         341 : bat_storage_init( store_functions *sf)
    5079             : {
    5080         341 :         sf->bind_col = &bind_col;
    5081         341 :         sf->bind_updates = &bind_updates;
    5082         341 :         sf->bind_updates_idx = &bind_updates_idx;
    5083         341 :         sf->bind_idx = &bind_idx;
    5084         341 :         sf->bind_cands = &bind_cands;
    5085             : 
    5086         341 :         sf->claim_tab = &claim_tab;
    5087         341 :         sf->key_claim_tab = &key_claim_tab;
    5088         341 :         sf->tab_validate = &tab_validate;
    5089             : 
    5090         341 :         sf->append_col = &append_col;
    5091         341 :         sf->append_idx = &append_idx;
    5092             : 
    5093         341 :         sf->update_col = &update_col;
    5094         341 :         sf->update_idx = &update_idx;
    5095             : 
    5096         341 :         sf->delete_tab = &delete_tab;
    5097             : 
    5098         341 :         sf->count_del = &count_del;
    5099         341 :         sf->count_col = &count_col;
    5100         341 :         sf->count_idx = &count_idx;
    5101         341 :         sf->dcount_col = &dcount_col;
    5102         341 :         sf->min_max_col = &min_max_col;
    5103         341 :         sf->set_stats_col = &set_stats_col;
    5104         341 :         sf->sorted_col = &sorted_col;
    5105         341 :         sf->unique_col = &unique_col;
    5106         341 :         sf->double_elim_col = &double_elim_col;
    5107         341 :         sf->col_stats = &col_stats;
    5108         341 :         sf->col_set_range = &col_set_range;
    5109         341 :         sf->col_not_null = &col_not_null;
    5110             : 
    5111         341 :         sf->col_dup = &col_dup;
    5112         341 :         sf->idx_dup = &idx_dup;
    5113         341 :         sf->del_dup = &del_dup;
    5114             : 
    5115         341 :         sf->create_col = &create_col;    /* create and add to change list */
    5116         341 :         sf->create_idx = &create_idx;
    5117         341 :         sf->create_del = &create_del;
    5118             : 
    5119         341 :         sf->destroy_col = &destroy_col;  /* free resources */
    5120         341 :         sf->destroy_idx = &destroy_idx;
    5121         341 :         sf->destroy_del = &destroy_del;
    5122             : 
    5123         341 :         sf->drop_col = &drop_col;                /* add drop to change list */
    5124         341 :         sf->drop_idx = &drop_idx;
    5125         341 :         sf->drop_del = &drop_del;
    5126             : 
    5127         341 :         sf->clear_table = &clear_table;
    5128             : 
    5129         341 :         sf->swap_bats = &swap_bats;
    5130         341 :         sf->col_compress = &col_compress;
    5131         341 : }
    5132             : 
    5133             : #if 0
    5134             : static lng
    5135             : log_get_nr_inserted(sql_column *fc, lng *offset)
    5136             : {
    5137             :         lng cnt = 0;
    5138             : 
    5139             :         if (!fc || GDKinmemory(0))
    5140             :                 return 0;
    5141             : 
    5142             :         if (fc->base.atime && fc->base.allocated) {
    5143             :                 sql_delta *fb = fc->data;
    5144             :                 BAT *ins = temp_descriptor(fb->cs.bid);
    5145             : 
    5146             :                 if (ins && BATcount(ins) > 0 && BATcount(ins) > ins->batInserted) {
    5147             :                         cnt = BATcount(ins) - ins->batInserted;
    5148             :                 }
    5149             :                 bat_destroy(ins);
    5150             :         }
    5151             :         return cnt;
    5152             : }
    5153             : 
    5154             : static lng
    5155             : log_get_nr_deleted(sql_table *ft, lng *offset)
    5156             : {
    5157             :         lng cnt = 0;
    5158             : 
    5159             :         if (!ft || GDKinmemory(0))
    5160             :                 return 0;
    5161             : 
    5162             :         if (ft->base.atime && ft->base.allocated) {
    5163             :                 storage *fdb = ft->data;
    5164             :                 BAT *db = temp_descriptor(fdb->cs.bid);
    5165             : 
    5166             :                 if (db && BATcount(db) > 0 && BATcount(db) > db->batInserted) {
    5167             :                         cnt = BATcount(db) - db->batInserted;
    5168             :                         *offset = db->batInserted;
    5169             :                 }
    5170             :                 bat_destroy(db);
    5171             :         }
    5172             :         return cnt;
    5173             : }
    5174             : #endif

Generated by: LCOV version 1.14