LCOV - code coverage report
Current view: top level - sql/storage/bat - bat_storage.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 2463 3160 77.9 %
Date: 2024-04-26 00:35:57 Functions: 156 163 95.7 %

          Line data    Source code
       1             : /*
       2             :  * SPDX-License-Identifier: MPL-2.0
       3             :  *
       4             :  * This Source Code Form is subject to the terms of the Mozilla Public
       5             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       6             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       7             :  *
       8             :  * Copyright 2024 MonetDB Foundation;
       9             :  * Copyright August 2008 - 2023 MonetDB B.V.;
      10             :  * Copyright 1997 - July 2008 CWI.
      11             :  */
      12             : 
      13             : #include "monetdb_config.h"
      14             : #include "bat_storage.h"
      15             : #include "bat_utils.h"
      16             : #include "sql_string.h"
      17             : #include "gdk_atoms.h"
      18             : #include "gdk_atoms.h"
      19             : #include "matomic.h"
      20             : 
      21             : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
      22             : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
      23             : 
      24             : static int log_update_col( sql_trans *tr, sql_change *c);
      25             : static int log_update_idx( sql_trans *tr, sql_change *c);
      26             : static int log_update_del( sql_trans *tr, sql_change *c);
      27             : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      28             : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      29             : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
      30             : static int log_create_col(sql_trans *tr, sql_change *change);
      31             : static int log_create_idx(sql_trans *tr, sql_change *change);
      32             : static int log_create_del(sql_trans *tr, sql_change *change);
      33             : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      34             : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      35             : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
      36             : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
      37             : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
      38             : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
      39             : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
      40             : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
      41             : 
      42             : static void merge_delta( sql_delta *obat);
      43             : 
      44             : /* valid
      45             :  * !deleted && VALID_4_READ(TS, tr)                             existing or newly created segment
      46             :  *  deleted && TS > tr->ts && OLDTS < tr->ts                deleted after current transaction
      47             :  */
      48             : 
      49             : #define VALID_4_READ(TS,tr) \
      50             :         (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
      51             : 
      52             : /* when changed, check if the old status is still valid */
      53             : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
      54             :                 (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
      55             : 
      56             : #define SEG_VALID_4_DELETE(seg,tr) \
      57             :         (!seg->deleted && VALID_4_READ(seg->ts, tr))
      58             : 
      59             : /* Delete (in current trans or by some other finised transaction, or re-used segment which used to be deleted */
      60             : #define SEG_IS_DELETED(seg,tr) \
      61             :         ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
      62             :          (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
      63             : 
      64             : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
      65             : #define SEG_IS_VALID(seg, tr) \
      66             :                 ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
      67             :                  (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
      68             : 
      69             : static inline BAT *
      70        5152 : transfer_to_systrans(BAT *b)
      71             : {
      72             :         /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
      73        5152 :         MT_lock_set(&b->theaplock);
      74        5152 :         if (VIEWtparent(b) || VIEWvtparent(b)) {
      75          18 :                 MT_lock_unset(&b->theaplock);
      76          18 :                 BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
      77          18 :                 BBPreclaim(b);
      78          18 :                 return bn;
      79             :         }
      80        5134 :         if (b->theap->farmid == TRANSIENT ||
      81          14 :                 (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
      82        4749 :                 QryCtx *qc = MT_thread_get_qry_ctx();
      83        4750 :                 if (qc) {
      84        2424 :                         if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
      85        2423 :                                 ATOMIC_SUB(&qc->datasize, b->theap->size);
      86        2423 :                                 b->theap->farmid = SYSTRANS;
      87        2423 :                                 b->batRole = SYSTRANS;
      88             :                         }
      89        2424 :                         if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
      90        1061 :                                 ATOMIC_SUB(&qc->datasize, b->tvheap->size);
      91        1061 :                                 b->tvheap->farmid = SYSTRANS;
      92             :                         }
      93             :                 }
      94             :         }
      95        5135 :         MT_lock_unset(&b->theaplock);
      96        5134 :         return b;
      97             : }
      98             : 
      99             : static void
     100    26220459 : lock_table(sqlstore *store, sqlid id)
     101             : {
     102    26220459 :         MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     103    26309700 : }
     104             : 
     105             : static void
     106    26309216 : unlock_table(sqlstore *store, sqlid id)
     107             : {
     108    26309216 :         MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
     109    26307788 : }
     110             : 
     111             : static void
     112    19917110 : lock_column(sqlstore *store, sqlid id)
     113             : {
     114    19917110 :         MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     115    19950155 : }
     116             : 
     117             : static void
     118    19938144 : unlock_column(sqlstore *store, sqlid id)
     119             : {
     120    19938144 :         MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
     121    19957956 : }
     122             : 
     123             : static void
     124      109635 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     125             : {
     126      109635 :         assert(cleanup);
     127      109635 :         trans_add(tr, dup_base(b), data, cleanup, commit, log);
     128      109633 : }
     129             : 
     130             : static void
     131      133019 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
     132             : {
     133      133019 :         assert(cleanup);
     134      133019 :         dup_base(&t->base);
     135      133018 :         trans_add(tr, b, data, cleanup, commit, log);
     136      133004 : }
     137             : 
     138             : static int
     139       76103 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
     140             : {
     141       76103 :         segment *s = change->data;
     142             : 
     143       76103 :         if (s->ts <= oldest) {
     144       32884 :                 while(s) {
     145       20435 :                         segment *n = s->prev;
     146       20435 :                         ATOMIC_PTR_DESTROY(&s->next);
     147       20435 :                         _DELETE(s);
     148       20435 :                         s = n;
     149             :                 }
     150       12449 :                 sqlstore *store = Store;
     151       12449 :                 table_destroy(store, (sql_table*)change->obj);
     152       12449 :                 return 1;
     153             :         }
     154             :         return LOG_OK;
     155             : }
     156             : 
     157             : static void
     158       20435 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
     159             : {
     160             :         /* we can only be accessed by anything older then commit_ts */
     161       20435 :         if (c->cleanup == &tc_gc_seg)
     162        7986 :                 s->prev = c->data;
     163             :         else
     164       12449 :                 c->cleanup = &tc_gc_seg;
     165       20435 :         c->data = s;
     166       20435 :         s->ts = commit_ts;
     167       16063 : }
     168             : 
     169             : static segment *
     170       86502 : new_segment(segment *o, sql_trans *tr, size_t cnt)
     171             : {
     172       86502 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     173             : 
     174       86504 :         assert(tr);
     175       86504 :         if (n) {
     176       86504 :                 *n = (segment) {
     177       86504 :                         .ts = tr->tid,
     178             :                         .oldts = 0,
     179             :                         .deleted = false,
     180             :                         .start = 0,
     181             :                         .end = cnt,
     182             :                         .next = ATOMIC_PTR_VAR_INIT(NULL),
     183             :                         .prev = NULL,
     184             :                 };
     185       86504 :                 if (o) {
     186       35690 :                         n->start += o->end;
     187       35690 :                         n->end += o->end;
     188       35690 :                         ATOMIC_PTR_SET(&o->next, n);
     189             :                 }
     190             :         }
     191       86504 :         return n;
     192             : }
     193             : 
     194             : static segment *
     195       87579 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
     196             : {
     197       87579 :         assert(tr);
     198       87579 :         if (o->start == start && o->end == start+cnt) {
     199        9621 :                 assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
     200        9621 :                 o->oldts = o->ts;
     201        9621 :                 o->ts = tr->tid;
     202        9621 :                 o->deleted = deleted;
     203        9621 :                 return o;
     204             :         }
     205       77958 :         segment *n = (segment*)GDKmalloc(sizeof(segment));
     206             : 
     207       77958 :         if (!n)
     208             :                 return NULL;
     209       77958 :         n->prev = NULL;
     210             : 
     211       77958 :         if (o->ts == tr->tid) {
     212        4052 :                 n->oldts = 0;
     213        4052 :                 n->ts = 1;
     214        4052 :                 n->deleted = true;
     215             :         } else {
     216       73906 :                 n->oldts = o->ts;
     217       73906 :                 n->ts = tr->tid;
     218       73906 :                 n->deleted = deleted;
     219             :         }
     220       77958 :         if (start == o->start) {
     221             :                 /* 2-way split: o remains latter part of segment, new one is
     222             :                  * inserted before */
     223       63482 :                 n->start = o->start;
     224       63482 :                 n->end = n->start + cnt;
     225       63482 :                 ATOMIC_PTR_INIT(&n->next, o);
     226       63482 :                 if (segs->h == o)
     227         454 :                         segs->h = n;
     228       63482 :                 if (p)
     229       63028 :                         ATOMIC_PTR_SET(&p->next, n);
     230       63482 :                 o->start = n->end;
     231       14476 :         } else if (start+cnt == o->end) {
     232             :                 /* 2-way split: o remains first part of segment, new one is
     233             :                  * added after */
     234        5271 :                 n->start = o->end - cnt;
     235        5271 :                 n->end = o->end;
     236        5271 :                 ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
     237        5271 :                 ATOMIC_PTR_SET(&o->next, n);
     238        5271 :                 if (segs->t == o)
     239         805 :                         segs->t = n;
     240        5271 :                 o->end = n->start;
     241             :         } else {
     242             :                 /* 3-way split: o remains first part of segment, two new ones
     243             :                  * are added after */
     244        9205 :                 segment *n2 = GDKmalloc(sizeof(segment));
     245        9205 :                 if (n2 == NULL) {
     246           0 :                         GDKfree(n);
     247           0 :                         return NULL;
     248             :                 }
     249        9205 :                 ATOMIC_PTR_INIT(&n->next, n2);
     250        9205 :                 n->start = start;
     251        9205 :                 n->end = start + cnt;
     252        9205 :                 *n2 = *o;
     253        9205 :                 ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
     254        9205 :                 n2->start = n->end;
     255        9205 :                 n2->prev = NULL;
     256        9205 :                 if (segs->t == o)
     257        3455 :                         segs->t = n2;
     258        9205 :                 ATOMIC_PTR_SET(&o->next, n);
     259        9205 :                 o->end = start;
     260             :         }
     261             :         return n;
     262             : }
     263             : 
     264             : static void
     265        3485 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
     266             : {
     267        3485 :         segment *cur = segs->h, *seg = NULL;
     268       15356 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     269       11871 :                 if (cur->ts == tr->tid) { /* revert */
     270        3952 :                         cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
     271        3952 :                         cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
     272        3952 :                         cur->oldts = 0;
     273             :                 }
     274       11871 :                 if (cur->ts <= oldest) { /* possibly merge range */
     275       11468 :                         if (!seg) { /* skip first */
     276             :                                 seg = cur;
     277        7983 :                         } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
     278             :                                 /* merge with previous */
     279        4372 :                                 seg->end = cur->end;
     280        4372 :                                 ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     281        4372 :                                 if (cur == segs->t)
     282        2677 :                                         segs->t = seg;
     283        4372 :                                 mark4destroy(cur, change, store_get_timestamp(tr->store));
     284        4372 :                                 cur = seg;
     285             :                         } else {
     286             :                                 seg = cur; /* begin of new merge */
     287             :                         }
     288             :                 }
     289             :         }
     290        3485 : }
     291             : 
     292             : static size_t
     293      100506 : segs_end_include_deleted( segments *segs, sql_trans *tr)
     294             : {
     295      100506 :         size_t cnt = 0;
     296      100506 :         segment *s = segs->h, *l = NULL;
     297             : 
     298      466626 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     299      366120 :                 if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
     300             :                                 l = s;
     301             :         }
     302      100506 :         if (l)
     303      100499 :                 cnt = l->end;
     304      100506 :         return cnt;
     305             : }
     306             : 
     307             : static int
     308      100506 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
     309             : {
     310             :         /* set bits correctly */
     311      100506 :         BAT *b = temp_descriptor(cs->bid);
     312             : 
     313      100506 :         if (!b)
     314             :                 return LOG_ERR;
     315      100506 :         segment *s = segs->h;
     316             : 
     317      100506 :         size_t nr = segs_end_include_deleted(segs, tr);
     318      100506 :         size_t rounded_nr = ((nr+31)&~31);
     319      100506 :         if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
     320           0 :                 bat_destroy(b);
     321           0 :                 return LOG_ERR;
     322             :         }
     323             : 
     324             :         /* disable all properties here */
     325      100506 :         MT_lock_set(&b->theaplock);
     326      100506 :         b->tsorted = false;
     327      100506 :         b->trevsorted = false;
     328      100506 :         b->tnosorted = 0;
     329      100506 :         b->tnorevsorted = 0;
     330      100506 :         b->tseqbase = oid_nil;
     331      100506 :         b->tkey = false;
     332      100506 :         b->tnokey[0] = 0;
     333      100506 :         b->tnokey[1] = 0;
     334      100506 :         b->theap->dirty = true;
     335      100506 :         BUN cnt = BATcount(b);
     336      100506 :         MT_lock_unset(&b->theaplock);
     337             : 
     338      100506 :         uint32_t *restrict dst;
     339             :         /* why hashlock ?? */
     340      100506 :         MT_rwlock_wrlock(&b->thashlock);
     341      509601 :         for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
     342      349541 :                 if (s->start >= nr)
     343             :                         break;
     344      308589 :                 if (s->ts == tr->tid && s->end != s->start) {
     345      145056 :                         if (cnt < s->start) { /* first mark as deleted ! */
     346        3173 :                                 size_t lnr = s->start-cnt;
     347        3173 :                                 size_t pos = cnt;
     348        3173 :                                 dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     349        3173 :                                 uint32_t cur = 0;
     350             : 
     351        3173 :                                 size_t used = pos&31, end = 32;
     352        3173 :                                 if (used) {
     353        3072 :                                         if (lnr < (32-used))
     354        2924 :                                                 end = used + lnr;
     355        3072 :                                         assert(end > used);
     356        3072 :                                         cur |= ((1U << (end - used)) - 1) << used;
     357        3072 :                                         lnr -= end - used;
     358        3072 :                                         *dst++ |= cur;
     359        3072 :                                         cur = 0;
     360             :                                 }
     361        3173 :                                 size_t full = lnr/32;
     362        3173 :                                 size_t rest = lnr%32;
     363        3173 :                                 if (full > 0) {
     364           6 :                                         memset(dst, ~0, full * sizeof(*dst));
     365           6 :                                         dst += full;
     366           6 :                                         lnr -= full * 32;
     367             :                                 }
     368        3173 :                                 if (rest > 0) {
     369         142 :                                         cur |= (1U << rest) - 1;
     370         142 :                                         lnr -= rest;
     371         142 :                                         *dst |= cur;
     372             :                                 }
     373        3173 :                                 assert(lnr==0);
     374             :                         }
     375      145056 :                         size_t lnr = s->end-s->start;
     376      145056 :                         size_t pos = s->start;
     377      145056 :                         dst = (uint32_t *) Tloc(b, 0) + (pos/32);
     378      145056 :                         uint32_t cur = 0;
     379      145056 :                         size_t used = pos&31, end = 32;
     380      145056 :                         if (used) {
     381      107437 :                                 if (lnr < (32-used))
     382      101527 :                                         end = used + lnr;
     383      107437 :                                 assert(end > used);
     384      107437 :                                 cur |= ((1U << (end - used)) - 1) << used;
     385      107437 :                                 lnr -= end - used;
     386      107437 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     387      107437 :                                 dst++;
     388      107437 :                                 cur = 0;
     389             :                         }
     390      145056 :                         size_t full = lnr/32;
     391      145056 :                         size_t rest = lnr%32;
     392      145056 :                         if (full > 0) {
     393        3513 :                                 memset(dst, s->deleted?~0:0, full * sizeof(*dst));
     394        3513 :                                 dst += full;
     395        3513 :                                 lnr -= full * 32;
     396             :                         }
     397      145056 :                         if (rest > 0) {
     398       40540 :                                 cur |= (1U << rest) - 1;
     399       40540 :                                 lnr -= rest;
     400       40540 :                                 *dst = s->deleted ? *dst | cur : *dst & ~cur;
     401             :                         }
     402      145056 :                         assert(lnr==0);
     403      145056 :                         if (cnt < s->end)
     404      308589 :                                 cnt = s->end;
     405             :                 }
     406             :         }
     407      100506 :         MT_rwlock_wrunlock(&b->thashlock);
     408      100506 :         if (nr > BATcount(b)) {
     409       61946 :                 MT_lock_set(&b->theaplock);
     410       61946 :                 BATsetcount(b, nr);
     411       61946 :                 MT_lock_unset(&b->theaplock);
     412             :         }
     413             : 
     414      100506 :         bat_destroy(b);
     415      100506 :         return LOG_OK;
     416             : }
     417             : 
     418             : /* TODO return LOG_OK/ERR */
     419             : static void
     420      100532 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
     421             : {
     422      100532 :         sqlstore* store = tr->store;
     423      100532 :         segment *cur = s->segs->h, *seg = NULL;
     424      466724 :         for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
     425      366192 :                 if (cur->ts == tr->tid) {
     426      158705 :                         if (!cur->deleted)
     427       89990 :                                 cur->oldts = 0;
     428      158705 :                         cur->ts = commit_ts;
     429             :                 }
     430      366192 :                 if (!seg) {
     431             :                         /* first segment */
     432             :                         seg = cur;
     433             :                 }
     434      265660 :                 else if (seg->ts < TRANSACTION_ID_BASE) {
     435             :                         /* possible merge since both deleted flags are equal */
     436      246891 :                         if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
     437      184859 :                                 int merge = 1;
     438      184859 :                                 node *n = store->active->h;
     439      581425 :                                 for (int i = 0; i < store->active->cnt; i++, n = n->next) {
     440      482948 :                                         sql_trans* other = ((sql_trans*)n->data);
     441      482948 :                                         ulng active = other->ts;
     442      482948 :                                         if(other->active == 2)
     443       26914 :                                                 continue; /* pretend that another recently committed transaction is no longer active */
     444      456034 :                                         if (active == tr->ts)
     445      129362 :                                                 continue; /* pretend that committing transaction has already committed and is no longer active */
     446      326672 :                                         if (seg->ts < active && cur->ts < active)
     447             :                                                 break;
     448      315051 :                                         if (seg->ts > active && cur->ts > active)
     449      240290 :                                                 continue;
     450             : 
     451       74761 :                                         assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
     452             :                                         /* cannot safely merge since there is an active transaction between the segments */
     453             :                                         merge = false;
     454             :                                         break;
     455             :                                 }
     456             :                                 /* merge segments */
     457      220196 :                                 if (merge) {
     458      110098 :                                         seg->end = cur->end;
     459      110098 :                                         ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
     460      110098 :                                         if (cur == s->segs->t)
     461       26569 :                                                 s->segs->t = seg;
     462      110098 :                                         if (commit_ts == oldest) {
     463       94035 :                                                 ATOMIC_PTR_DESTROY(&cur->next);
     464       94035 :                                                 _DELETE(cur);
     465             :                                         } else
     466       32126 :                                                 mark4destroy(cur, change, commit_ts);
     467      110098 :                                         cur = seg;
     468      110098 :                                         continue;
     469             :                                 }
     470             :                         }
     471             :                 }
     472             :                 seg = cur;
     473             :         }
     474      100532 : }
     475             : 
     476             : static int
     477     2119110 : segments_in_transaction(sql_trans *tr, sql_table *t)
     478             : {
     479     2119110 :         storage *s = ATOMIC_PTR_GET(&t->data);
     480     2119110 :         segment *seg = s->segs->h;
     481             : 
     482     2119110 :         if (seg && s->segs->t->ts == tr->tid)
     483             :                 return 1;
     484      636614 :         for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
     485      534990 :                 if (seg->ts == tr->tid)
     486             :                         return 1;
     487             :         }
     488             :         return 0;
     489             : }
     490             : 
     491             : static size_t
     492    18795497 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
     493             : {
     494    18795497 :         size_t cnt = 0;
     495             : 
     496    18795497 :         lock_table(tr->store, table->base.id);
     497    18862295 :         segment *s = segs->h, *l = NULL;
     498             : 
     499    18862295 :         if (segs->t && SEG_IS_VALID(segs->t, tr))
     500    15723325 :                 l = s = segs->t;
     501             : 
     502   216332575 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     503   197470630 :                 if (SEG_IS_VALID(s, tr))
     504             :                                 l = s;
     505             :         }
     506    18861945 :         if (l)
     507    18844715 :                 cnt = l->end;
     508    18861945 :         unlock_table(tr->store, table->base.id);
     509    18861038 :         return cnt;
     510             : }
     511             : 
     512             : static segments *
     513       50812 : new_segments(sql_trans *tr, size_t cnt)
     514             : {
     515       50812 :         segments *n = (segments*)GDKmalloc(sizeof(segments));
     516             : 
     517       50814 :         if (n) {
     518       50814 :                 n->h = n->t = new_segment(NULL, tr, cnt);
     519       50811 :                 if (!n->h) {
     520           0 :                         GDKfree(n);
     521           0 :                         return NULL;
     522             :                 }
     523       50811 :                 sql_ref_init(&n->r);
     524             :         }
     525             :         return n;
     526             : }
     527             : 
     528             : static sql_delta *
     529    32481078 : timestamp_delta( sql_trans *tr, sql_delta *d)
     530             : {
     531    32534702 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     532       53624 :                 d = d->next;
     533    32487987 :         return d;
     534             : }
     535             : 
     536             : static sql_delta *
     537    32319647 : col_timestamp_delta( sql_trans *tr, sql_column *c)
     538             : {
     539    32319647 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
     540             : }
     541             : 
     542             : static sql_delta *
     543       25236 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
     544             : {
     545       25236 :         return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
     546             : }
     547             : 
     548             : static storage *
     549    19755407 : timestamp_storage( sql_trans *tr, storage *d)
     550             : {
     551    19755407 :         if (!d)
     552             :                 return NULL;
     553    19828003 :         while (d->next && !VALID_4_READ(d->cs.ts, tr))
     554       72596 :                 d = d->next;
     555             :         return d;
     556             : }
     557             : 
     558             : static storage *
     559    19738078 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
     560             : {
     561    19738078 :         return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
     562             : }
     563             : 
     564             : static sql_delta*
     565       18536 : delta_dup(sql_delta *d)
     566             : {
     567       18536 :         ATOMIC_INC(&d->cs.refcnt);
     568       18536 :         return d;
     569             : }
     570             : 
     571             : static void *
     572       17167 : col_dup(sql_column *c)
     573             : {
     574       17167 :         return delta_dup(ATOMIC_PTR_GET(&c->data));
     575             : }
     576             : 
     577             : static void *
     578        2656 : idx_dup(sql_idx *i)
     579             : {
     580        2656 :         if (!ATOMIC_PTR_GET(&i->data))
     581             :                 return NULL;
     582        1369 :         return delta_dup(ATOMIC_PTR_GET(&i->data));
     583             : }
     584             : 
     585             : static storage*
     586        1480 : storage_dup(storage *d)
     587             : {
     588        1480 :         ATOMIC_INC(&d->cs.refcnt);
     589        1480 :         return d;
     590             : }
     591             : 
     592             : static void *
     593        1480 : del_dup(sql_table *t)
     594             : {
     595        1480 :         return storage_dup(ATOMIC_PTR_GET(&t->data));
     596             : }
     597             : 
     598             : static size_t
     599          17 : count_inserts( segment *s, sql_trans *tr)
     600             : {
     601          17 :         size_t cnt = 0;
     602             : 
     603          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     604          55 :                 if (!s->deleted && s->ts == tr->tid)
     605           4 :                         cnt += s->end - s->start;
     606             :         }
     607          17 :         return cnt;
     608             : }
     609             : 
     610             : static size_t
     611      836462 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
     612             : {
     613      836462 :         size_t cnt = 0;
     614             : 
     615      978181 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
     616             :                 ;
     617             : 
     618     4833080 :         for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
     619     3996615 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
     620     1310719 :                         cnt += s->end - s->start;
     621             :         }
     622      836465 :         return cnt;
     623             : }
     624             : 
     625             : static size_t
     626          17 : count_deletes( segment *s, sql_trans *tr)
     627             : {
     628          17 :         size_t cnt = 0;
     629             : 
     630          72 :         for(;s; s = ATOMIC_PTR_GET(&s->next)) {
     631          55 :                 if (SEG_IS_DELETED(s, tr))
     632          17 :                         cnt += s->end - s->start;
     633             :         }
     634          17 :         return cnt;
     635             : }
     636             : 
     637             : #define CNT_ACTIVE 10
     638             : 
     639             : static size_t
     640    18194771 : count_col(sql_trans *tr, sql_column *c, int access)
     641             : {
     642    18194771 :         storage *d;
     643    18194771 :         sql_delta *ds;
     644             : 
     645    18194771 :         if (!isTable(c->t))
     646             :                 return 0;
     647    18194771 :         d = tab_timestamp_storage(tr, c->t);
     648    18206820 :         ds = col_timestamp_delta(tr, c);
     649    18230690 :         if (!d ||!ds)
     650             :                 return 0;
     651    18230690 :         if (access == 2)
     652      445120 :                 return ds?ds->cs.ucnt:0;
     653    17785570 :         if (access == 1)
     654          17 :                 return count_inserts(d->segs->h, tr);
     655    17785553 :         if (access == QUICK)
     656      521081 :                 return d->segs->t?d->segs->t->end:0;
     657    17264472 :         if (access == CNT_ACTIVE) {
     658      836276 :                 size_t cnt = segs_end(d->segs, tr, c->t);
     659      836650 :                 lock_table(tr->store, c->t->base.id);
     660      836476 :                 cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
     661      836489 :                 unlock_table(tr->store, c->t->base.id);
     662      836489 :                 return cnt;
     663             :         }
     664    16428196 :         return segs_end(d->segs, tr, c->t);
     665             : }
     666             : 
     667             : static size_t
     668       21104 : count_idx(sql_trans *tr, sql_idx *i, int access)
     669             : {
     670       21104 :         storage *d;
     671       21104 :         sql_delta *ds;
     672             : 
     673       21104 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
     674        4595 :                 return 0;
     675       16510 :         d = tab_timestamp_storage(tr, i->t);
     676       16541 :         ds = idx_timestamp_delta(tr, i);
     677       16556 :         if (!d || !ds)
     678             :                 return 0;
     679       16556 :         if (access == 2)
     680        2848 :                 return ds?ds->cs.ucnt:0;
     681       13708 :         if (access == 1)
     682           0 :                 return count_inserts(d->segs->h, tr);
     683       13708 :         if (access == QUICK)
     684        3301 :                 return d->segs->t?d->segs->t->end:0;
     685       10407 :         return segs_end(d->segs, tr, i->t);
     686             : }
     687             : 
     688             : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
     689             : static BAT *
     690    13933328 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
     691             : {
     692    13933328 :         BAT *b;
     693             : 
     694    13933328 :         assert(access == RD_UPD_ID || access == RD_UPD_VAL);
     695             :         /* returns the updates for cs */
     696    13933328 :         if (cs->uibid && cs->uvbid && cs->ucnt) {
     697        7616 :                 if (access == RD_UPD_ID) {
     698        4906 :                         if (!(b = temp_descriptor(cs->uibid)))
     699             :                                 return NULL;
     700        4906 :                         if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
     701         936 :                            (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
     702        3970 :                                         oid nil = oid_nil;
     703             :                                         /* less then cnt */
     704        3970 :                                         BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false);
     705        3970 :                                         if (!s) {
     706           0 :                                                 bat_destroy(b);
     707           0 :                                                 return NULL;
     708             :                                         }
     709             : 
     710        3970 :                                         BAT *nb = BATproject(s, b);
     711        3970 :                                         bat_destroy(s);
     712        3970 :                                         bat_destroy(b);
     713        3970 :                                         b = nb;
     714             :                         }
     715             :                 } else {
     716        2710 :                         b = temp_descriptor(cs->uvbid);
     717             :                 }
     718             :         } else {
     719    23210494 :                 b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
     720             :         }
     721             :         return b;
     722             : }
     723             : 
     724             : static BAT *
     725           0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
     726             : {
     727           0 :         int err = 0;
     728           0 :         BAT *uv = *UV;
     729           0 :         BUN cnt = BATcount(ui)+BATcount(oi);
     730           0 :         BATiter uvi;
     731           0 :         BATiter ovi;
     732             : 
     733           0 :         if (uv) {
     734           0 :                 uvi = bat_iterator(uv);
     735           0 :                 ovi = bat_iterator(ov);
     736             :         }
     737             : 
     738             :         /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
     739           0 :         BUN uip = 0, uie = BATcount(ui);
     740           0 :         BUN oip = 0, oie = BATcount(oi);
     741             : 
     742           0 :         oid uiseqb = ui->tseqbase;
     743           0 :         oid oiseqb = oi->tseqbase;
     744           0 :         oid *uipt = NULL, *oipt = NULL;
     745           0 :         BATiter uii = bat_iterator(ui);
     746           0 :         BATiter oii = bat_iterator(oi);
     747           0 :         if (!BATtdensebi(&uii))
     748           0 :                 uipt = uii.base;
     749           0 :         if (!BATtdensebi(&oii))
     750           0 :                 oipt = oii.base;
     751             : 
     752           0 :         if (uiseqb == oiseqb && uie == oie) { /* full overlap, no values */
     753           0 :                 if (uv) {
     754           0 :                         bat_iterator_end(&uvi);
     755           0 :                         bat_iterator_end(&ovi);
     756             :                 }
     757           0 :                 bat_iterator_end(&uii);
     758           0 :                 bat_iterator_end(&oii);
     759           0 :                 if (uv) {
     760           0 :                         *UV = uv;
     761             :                 } else {
     762           0 :                         bat_destroy(uv);
     763             :                 }
     764           0 :                 bat_destroy(oi);
     765           0 :                 bat_destroy(ov);
     766           0 :                 return ui;
     767             :         }
     768           0 :         BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
     769           0 :         BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
     770             : 
     771           0 :         if (!ni || (uv && !nv)) {
     772           0 :                 bat_destroy(ni);
     773           0 :                 bat_destroy(nv);
     774           0 :                 bat_destroy(ui);
     775           0 :                 bat_destroy(uv);
     776           0 :                 bat_destroy(oi);
     777           0 :                 bat_destroy(ov);
     778           0 :                 return NULL;
     779             :         }
     780           0 :         while (uip < uie && oip < oie && !err) {
     781           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     782           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     783             : 
     784           0 :                 if (uiid <= oiid) {
     785           0 :                         if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     786           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     787             :                                 err = 1;
     788           0 :                         uip++;
     789           0 :                         if (uiid == oiid)
     790           0 :                                 oip++;
     791             :                 } else { /* uiid > oiid */
     792           0 :                         if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     793           0 :                             (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     794             :                                 err = 1;
     795           0 :                         oip++;
     796             :                 }
     797             :         }
     798           0 :         while (uip < uie && !err) {
     799           0 :                 oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
     800           0 :                 if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
     801           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
     802             :                         err = 1;
     803           0 :                 uip++;
     804             :         }
     805           0 :         while (oip < oie && !err) {
     806           0 :                 oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
     807           0 :                 if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
     808           0 :                     (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
     809             :                         err = 1;
     810           0 :                 oip++;
     811             :         }
     812           0 :         if (uv) {
     813           0 :                 bat_iterator_end(&uvi);
     814           0 :                 bat_iterator_end(&ovi);
     815             :         }
     816           0 :         bat_iterator_end(&uii);
     817           0 :         bat_iterator_end(&oii);
     818           0 :         bat_destroy(ui);
     819           0 :         bat_destroy(uv);
     820           0 :         bat_destroy(oi);
     821           0 :         bat_destroy(ov);
     822           0 :         if (!err) {
     823           0 :                 if (nv)
     824           0 :                         *UV = nv;
     825           0 :                 return ni;
     826             :         }
     827           0 :         *UV = NULL;
     828           0 :         bat_destroy(ni);
     829           0 :         bat_destroy(nv);
     830           0 :         return NULL;
     831             : }
     832             : 
     833             : static sql_delta *
     834     9294339 : older_delta( sql_delta *d, sql_trans *tr)
     835             : {
     836     9294339 :         sql_delta *o = d->next;
     837             : 
     838     9299485 :         while (o && !o->cs.merged) {
     839        5133 :                 if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     840             :                         break;
     841             :                 else
     842        5146 :                         o = o->next;
     843             :         }
     844     9294352 :         if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
     845           0 :                 return o;
     846             :         return NULL;
     847             : }
     848             : 
     849             : static BAT *
     850     9289407 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
     851             : {
     852     9289407 :         assert(tr->active);
     853     9289407 :         sql_delta *o = NULL;
     854     9289407 :         BAT *ui = NULL, *uv = NULL;
     855             : 
     856     9289407 :         if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
     857             :                 return NULL;
     858     9293947 :         if (access == RD_UPD_VAL) {
     859     4647278 :                 if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
     860           0 :                         bat_destroy(ui);
     861           0 :                         return NULL;
     862             :                 }
     863             :         }
     864     9294275 :         while ((o = older_delta(d, tr)) != NULL) {
     865           0 :                 BAT *oui = NULL, *ouv = NULL;
     866           0 :                 if (!oui)
     867           0 :                         oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
     868           0 :                 if (access == RD_UPD_VAL)
     869           0 :                         ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
     870           0 :                 if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
     871           0 :                         bat_destroy(ui);
     872           0 :                         bat_destroy(uv);
     873           0 :                         bat_destroy(oui);
     874           0 :                         bat_destroy(ouv);
     875           0 :                         return NULL;
     876             :                 }
     877           0 :                 if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
     878             :                         return NULL;
     879             :                 d = o;
     880             :         }
     881     9294382 :         if (uv) {
     882     4647607 :                 bat_destroy(ui);
     883     4647607 :                 return uv;
     884             :         }
     885             :         return ui;
     886             : }
     887             : 
     888             : static BAT *
     889         519 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
     890             : {
     891         519 :         lock_column(tr->store, c->base.id);
     892         519 :         sql_delta *d = col_timestamp_delta(tr, c);
     893         519 :         int type = c->type.type->localtype;
     894             : 
     895         519 :         if (!d) {
     896           0 :                 unlock_column(tr->store, c->base.id);
     897           0 :                 return NULL;
     898             :         }
     899         519 :         if (d->cs.st == ST_DICT) {
     900           0 :                 BAT *b = quick_descriptor(d->cs.bid);
     901             : 
     902           0 :                 type = b->ttype;
     903             :         }
     904         519 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     905         519 :         unlock_column(tr->store, c->base.id);
     906         519 :         return bn;
     907             : }
     908             : 
     909             : static BAT *
     910           0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
     911             : {
     912           0 :         lock_column(tr->store, i->base.id);
     913           0 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     914           0 :         sql_delta *d = idx_timestamp_delta(tr, i);
     915             : 
     916           0 :         if (!d) {
     917           0 :                 unlock_column(tr->store, i->base.id);
     918           0 :                 return NULL;
     919             :         }
     920           0 :         BAT *bn = bind_ubat(tr, d, access, type, cnt);
     921           0 :         unlock_column(tr->store, i->base.id);
     922           0 :         return bn;
     923             : }
     924             : 
     925             : static BAT *
     926     9498280 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
     927             : {
     928     9498280 :         BAT *b;
     929             : 
     930     9498280 :         assert(access == RDONLY || access == QUICK || access == RD_EXT);
     931     9498280 :         assert(cs != NULL);
     932     9498280 :         if (access == QUICK)
     933      134150 :                 return quick_descriptor(cs->bid);
     934     9364130 :         if (access == RD_EXT)
     935         860 :                 return temp_descriptor(cs->ebid);
     936     9363270 :         assert(cs->bid);
     937     9363270 :         b = temp_descriptor(cs->bid);
     938     9364304 :         if (b == NULL)
     939             :                 return NULL;
     940     9364304 :         assert(b->batRestricted == BAT_READ);
     941             :         /* return slice */
     942     9364304 :         BAT *s = BATslice(b, 0, cnt);
     943     9348393 :         bat_destroy(b);
     944     9348393 :         return s;
     945             : }
     946             : 
     947             : static int
     948     4645063 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
     949             : {
     950     4645063 :         lock_column(tr->store, c->base.id);
     951     4645550 :         size_t cnt = count_col(tr, c, 0);
     952     4646855 :         sql_delta *d = col_timestamp_delta(tr, c);
     953     4646081 :         int type = c->type.type->localtype;
     954             : 
     955     4646081 :         if (!d) {
     956           0 :                 unlock_column(tr->store, c->base.id);
     957           0 :                 return LOG_ERR;
     958             :         }
     959     4646081 :         if (d->cs.st == ST_DICT) {
     960           2 :                 BAT *b = quick_descriptor(d->cs.bid);
     961             : 
     962           2 :                 type = b->ttype;
     963             :         }
     964             : 
     965     4646081 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     966     4646908 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     967             : 
     968     4646881 :         unlock_column(tr->store, c->base.id);
     969             : 
     970     4646647 :         if (*ui == NULL || *uv == NULL) {
     971           0 :                 bat_destroy(*ui);
     972           0 :                 bat_destroy(*uv);
     973           0 :                 return LOG_ERR;
     974             :         }
     975             :         return LOG_OK;
     976             : }
     977             : 
     978             : static int
     979          57 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
     980             : {
     981          57 :         lock_column(tr->store, i->base.id);
     982          57 :         size_t cnt = count_idx(tr, i, 0);
     983          57 :         sql_delta *d = idx_timestamp_delta(tr, i);
     984          57 :         int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
     985             : 
     986          57 :         if (!d) {
     987           0 :                 unlock_column(tr->store, i->base.id);
     988           0 :                 return LOG_ERR;
     989             :         }
     990             : 
     991          57 :         *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
     992          57 :         *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
     993             : 
     994          57 :         unlock_column(tr->store, i->base.id);
     995             : 
     996          57 :         if (*ui == NULL || *uv == NULL) {
     997           0 :                 bat_destroy(*ui);
     998           0 :                 bat_destroy(*uv);
     999           0 :                 return LOG_ERR;
    1000             :         }
    1001             :         return LOG_OK;
    1002             : }
    1003             : 
    1004             : static void *                                   /* BAT * */
    1005     9484353 : bind_col(sql_trans *tr, sql_column *c, int access)
    1006             : {
    1007     9484353 :         assert(access == QUICK || tr->active);
    1008     9484353 :         if (!isTable(c->t))
    1009             :                 return NULL;
    1010     9484353 :         sql_delta *d = col_timestamp_delta(tr, c);
    1011     9486778 :         if (!d)
    1012             :                 return NULL;
    1013     9486778 :         size_t cnt = count_col(tr, c, 0);
    1014     9490911 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1015         519 :                 return bind_ucol(tr, c, access, cnt);
    1016     9490392 :         BAT *b = cs_bind_bat( &d->cs, access, cnt);
    1017     9480612 :         assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
    1018             :         return b;
    1019             : }
    1020             : 
    1021             : static void *                                   /* BAT * */
    1022        8681 : bind_idx(sql_trans *tr, sql_idx * i, int access)
    1023             : {
    1024        8681 :         assert(access == QUICK || tr->active);
    1025        8681 :         if (!isTable(i->t))
    1026             :                 return NULL;
    1027        8681 :         sql_delta *d = idx_timestamp_delta(tr, i);
    1028        8685 :         if (!d)
    1029             :                 return NULL;
    1030        8685 :         size_t cnt = count_idx(tr, i, 0);
    1031        8685 :         if (access == RD_UPD_ID || access == RD_UPD_VAL)
    1032           0 :                 return bind_uidx(tr, i, access, cnt);
    1033        8685 :         return cs_bind_bat( &d->cs, access, cnt);
    1034             : }
    1035             : 
    1036             : static int
    1037        3888 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
    1038             : {
    1039        3888 :         if (!cs->uibid) {
    1040           0 :                 cs->uibid = e_bat(TYPE_oid);
    1041           0 :                 if (cs->uibid == BID_NIL)
    1042             :                         return LOG_ERR;
    1043             :         }
    1044        3888 :         if (!cs->uvbid) {
    1045           0 :                 BAT *cur = quick_descriptor(cs->bid);
    1046           0 :                 if (!cur)
    1047             :                         return LOG_ERR;
    1048           0 :                 int type = cur->ttype;
    1049           0 :                 cs->uvbid = e_bat(type);
    1050           0 :                 if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1051             :                         return LOG_ERR;
    1052             :         }
    1053        3888 :         BAT *ui = temp_descriptor(cs->uibid);
    1054        3888 :         BAT *uv = temp_descriptor(cs->uvbid);
    1055             : 
    1056        3888 :         if (ui == NULL || uv == NULL) {
    1057           0 :                 bat_destroy(ui);
    1058           0 :                 bat_destroy(uv);
    1059           0 :                 return LOG_ERR;
    1060             :         }
    1061        3888 :         assert(ui && uv);
    1062        3888 :         if (isEbat(ui)){
    1063         380 :                 temp_destroy(cs->uibid);
    1064         380 :                 cs->uibid = temp_copy(ui->batCacheid, true, true);
    1065         380 :                 bat_destroy(ui);
    1066         380 :                 if (cs->uibid == BID_NIL ||
    1067         380 :                     (ui = temp_descriptor(cs->uibid)) == NULL) {
    1068           0 :                         bat_destroy(uv);
    1069           0 :                         return LOG_ERR;
    1070             :                 }
    1071             :         }
    1072        3888 :         if (isEbat(uv)){
    1073         380 :                 temp_destroy(cs->uvbid);
    1074         380 :                 cs->uvbid = temp_copy(uv->batCacheid, true, true);
    1075         380 :                 bat_destroy(uv);
    1076         380 :                 if (cs->uvbid == BID_NIL ||
    1077         380 :                     (uv = temp_descriptor(cs->uvbid)) == NULL) {
    1078           0 :                         bat_destroy(ui);
    1079           0 :                         return LOG_ERR;
    1080             :                 }
    1081             :         }
    1082        3888 :         *Ui = ui;
    1083        3888 :         *Uv = uv;
    1084        3888 :         return LOG_OK;
    1085             : }
    1086             : 
    1087             : static int
    1088        6346 : segments_is_append(segment *s, sql_trans *tr, oid rid)
    1089             : {
    1090       93115 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1091       93115 :                 if (s->start <= rid && s->end > rid) {
    1092        6346 :                         if (s->ts == tr->tid && !s->deleted) {
    1093        2564 :                                 return 1;
    1094             :                         }
    1095             :                         break;
    1096             :                 }
    1097             :         }
    1098             :         return 0;
    1099             : }
    1100             : 
    1101             : static int
    1102        3782 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
    1103             : {
    1104       87937 :         for(; s; s=ATOMIC_PTR_GET(&s->next)) {
    1105       87937 :                 if (s->start <= rid && s->end > rid) {
    1106        3782 :                         if (s->ts >= tr->ts && s->deleted) {
    1107           0 :                                 return 1;
    1108             :                         }
    1109             :                         break;
    1110             :                 }
    1111             :         }
    1112             :         return 0;
    1113             : }
    1114             : 
    1115             : static sql_delta *
    1116           0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
    1117             : {
    1118           0 :         sql_delta *n = ZNEW(sql_delta);
    1119           0 :         if (!n)
    1120             :                 return NULL;
    1121           0 :         *n = *bat;
    1122           0 :         n->next = NULL;
    1123           0 :         n->cs.ts = tr->tid;
    1124           0 :         return n;
    1125             : }
    1126             : 
    1127             : static BAT *
    1128          17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
    1129             : {
    1130          17 :         BAT *newoffsets = NULL;
    1131          17 :         sql_delta *bat = *batp;
    1132          17 :         column_storage *cs = &bat->cs;
    1133          17 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1134             : 
    1135          17 :         if (!u)
    1136             :                 return NULL;
    1137          17 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1138          17 :         if (DICTprepare4append(&newoffsets, i, u) < 0) {
    1139           0 :                 bat_destroy(u);
    1140           0 :                 return NULL;
    1141             :         } else {
    1142          17 :                 int new = 0;
    1143             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1144          17 :                 if (BATcount(u) >= max_cnt) {
    1145           1 :                         if (max_cnt == 64*1024) { /* decompress */
    1146           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1147           0 :                                         bat_destroy(u);
    1148           0 :                                         return NULL;
    1149             :                                 }
    1150           0 :                                 if (cs->ucnt) {
    1151           0 :                                         BAT *ui = NULL, *uv = NULL;
    1152           0 :                                         BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1153           0 :                                         bat_destroy(b);
    1154           0 :                                         if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1155           0 :                                                 bat_destroy(nb);
    1156           0 :                                                 bat_destroy(u);
    1157           0 :                                                 return NULL;
    1158             :                                         }
    1159           0 :                                         b = nb;
    1160           0 :                                         if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1161           0 :                                                 bat_destroy(ui);
    1162           0 :                                                 bat_destroy(uv);
    1163           0 :                                                 bat_destroy(b);
    1164           0 :                                                 bat_destroy(u);
    1165             :                                         }
    1166           0 :                                         bat_destroy(ui);
    1167           0 :                                         bat_destroy(uv);
    1168             :                                 }
    1169           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1170           0 :                                 bat_destroy(b);
    1171           0 :                                 assert(newoffsets == NULL);
    1172           0 :                                 if (!n) {
    1173           0 :                                         bat_destroy(u);
    1174           0 :                                         return NULL;
    1175             :                                 }
    1176           0 :                                 if (cs->ts != tr->tid) {
    1177           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1178           0 :                                                 bat_destroy(n);
    1179           0 :                                                 return NULL;
    1180             :                                         }
    1181           0 :                                         cs = &(*batp)->cs;
    1182           0 :                                         new = 1;
    1183             :                                 }
    1184           0 :                                 if (cs->bid && !new)
    1185           0 :                                         temp_destroy(cs->bid);
    1186           0 :                                 n = transfer_to_systrans(n);
    1187           0 :                                 if (n == NULL)
    1188             :                                         return NULL;
    1189           0 :                                 bat_set_access(n, BAT_READ);
    1190           0 :                                 cs->bid = temp_create(n);
    1191           0 :                                 bat_destroy(n);
    1192           0 :                                 if (cs->ebid && !new)
    1193           0 :                                         temp_destroy(cs->ebid);
    1194           0 :                                 cs->ebid = 0;
    1195           0 :                                 cs->ucnt = 0;
    1196           0 :                                 if (cs->uibid && !new)
    1197           0 :                                         temp_destroy(cs->uibid);
    1198           0 :                                 if (cs->uvbid && !new)
    1199           0 :                                         temp_destroy(cs->uvbid);
    1200           0 :                                 cs->uibid = cs->uvbid = 0;
    1201           0 :                                 cs->st = ST_DEFAULT;
    1202           0 :                                 cs->cleared = true;
    1203             :                         } else {
    1204           1 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1205           0 :                                         bat_destroy(newoffsets);
    1206           0 :                                         bat_destroy(u);
    1207           0 :                                         return NULL;
    1208             :                                 }
    1209           1 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), PERSISTENT);
    1210           1 :                                 bat_destroy(b);
    1211           1 :                                 if (!n) {
    1212           0 :                                         bat_destroy(newoffsets);
    1213           0 :                                         bat_destroy(u);
    1214           0 :                                         return NULL;
    1215             :                                 }
    1216           1 :                                 if (cs->ts != tr->tid) {
    1217           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1218           0 :                                                 bat_destroy(n);
    1219           0 :                                                 return NULL;
    1220             :                                         }
    1221           0 :                                         cs = &(*batp)->cs;
    1222           0 :                                         new = 1;
    1223           0 :                                         temp_dup(cs->ebid);
    1224           0 :                                         if (cs->uibid) {
    1225           0 :                                                 temp_dup(cs->uibid);
    1226           0 :                                                 temp_dup(cs->uvbid);
    1227             :                                         }
    1228             :                                 }
    1229           1 :                                 if (cs->bid && !new)
    1230           1 :                                         temp_destroy(cs->bid);
    1231           1 :                                 n = transfer_to_systrans(n);
    1232           1 :                                 if (n == NULL)
    1233             :                                         return NULL;
    1234           1 :                                 bat_set_access(n, BAT_READ);
    1235           1 :                                 cs->bid = temp_create(n);
    1236           1 :                                 bat_destroy(n);
    1237           1 :                                 cs->cleared = true;
    1238           1 :                                 i = newoffsets;
    1239             :                         }
    1240             :                 } else { /* append */
    1241          16 :                         i = newoffsets;
    1242             :                 }
    1243             :         }
    1244          17 :         bat_destroy(u);
    1245          17 :         return i;
    1246             : }
    1247             : 
    1248             : static BAT *
    1249           0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
    1250             : {
    1251           0 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1252           0 :         BAT *newoffsets = NULL;
    1253           0 :         BAT *b = NULL, *n = NULL;
    1254             : 
    1255           0 :         if (!(b = temp_descriptor(cs->bid)))
    1256             :                 return NULL;
    1257             : 
    1258           0 :         if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
    1259           0 :                 bat_destroy(b);
    1260           0 :                 return NULL;
    1261             :         } else {
    1262             :                 /* returns new offset bat if values within min/max, else decompress */
    1263           0 :                 if (!newoffsets) { /* decompress */
    1264           0 :                         if (cs->ucnt) {
    1265           0 :                                 BAT *ui = NULL, *uv = NULL;
    1266           0 :                                 BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
    1267           0 :                                 bat_destroy(b);
    1268           0 :                                 if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1269           0 :                                         bat_destroy(nb);
    1270           0 :                                         return NULL;
    1271             :                                 }
    1272           0 :                                 b = nb;
    1273           0 :                                 if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
    1274           0 :                                         bat_destroy(ui);
    1275           0 :                                         bat_destroy(uv);
    1276           0 :                                         bat_destroy(b);
    1277             :                                 }
    1278           0 :                                 bat_destroy(ui);
    1279           0 :                                 bat_destroy(uv);
    1280             :                         }
    1281           0 :                         n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
    1282           0 :                         bat_destroy(b);
    1283           0 :                         if (!n)
    1284             :                                 return NULL;
    1285           0 :                         if (cs->bid)
    1286           0 :                                 temp_destroy(cs->bid);
    1287           0 :                         n = transfer_to_systrans(n);
    1288           0 :                         if (n == NULL)
    1289             :                                 return NULL;
    1290           0 :                         bat_set_access(n, BAT_READ);
    1291           0 :                         cs->bid = temp_create(n);
    1292           0 :                         cs->ucnt = 0;
    1293           0 :                         if (cs->uibid)
    1294           0 :                                 temp_destroy(cs->uibid);
    1295           0 :                         if (cs->uvbid)
    1296           0 :                                 temp_destroy(cs->uvbid);
    1297           0 :                         cs->uibid = cs->uvbid = 0;
    1298           0 :                         cs->st = ST_DEFAULT;
    1299           0 :                         cs->cleared = true;
    1300           0 :                         b = n;
    1301             :                 } else { /* append */
    1302             :                         i = newoffsets;
    1303             :                 }
    1304             :         }
    1305           0 :         bat_destroy(b);
    1306           0 :         return i;
    1307             : }
    1308             : 
    1309             : /*
    1310             :  * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
    1311             :  */
    1312             : static int
    1313        3009 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1314             : {
    1315        3009 :         int res = LOG_OK;
    1316        3009 :         sql_delta *bat = *batp;
    1317        3009 :         column_storage *cs = &bat->cs;
    1318        3009 :         BAT *otids = tids, *oupdates = updates;
    1319             : 
    1320        3009 :         if (!BATcount(tids))
    1321             :                 return LOG_OK;
    1322             : 
    1323        3009 :         if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
    1324           6 :                 tids = BATunmask(tids);
    1325           6 :                 if (!tids)
    1326             :                         return LOG_ERR;
    1327             :         }
    1328        3009 :         if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
    1329           0 :                 updates = BATunmask(updates);
    1330           0 :                 if (!updates) {
    1331           0 :                         if (otids != tids)
    1332           0 :                                 bat_destroy(tids);
    1333           0 :                         return LOG_ERR;
    1334             :                 }
    1335        3009 :         } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
    1336          42 :                 updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
    1337          42 :                 if (!updates) {
    1338           0 :                         if (otids != tids)
    1339           0 :                                 bat_destroy(tids);
    1340           0 :                         return LOG_ERR;
    1341             :                 }
    1342             :         }
    1343             : 
    1344        3009 :         if (cs->st == ST_DICT) {
    1345             :                 /* possibly a new array is returned */
    1346           4 :                 BAT *nupdates = dict_append_bat(tr, batp, updates);
    1347           4 :                 bat = *batp;
    1348           4 :                 cs = &bat->cs;
    1349           4 :                 if (oupdates != updates)
    1350           0 :                         bat_destroy(updates);
    1351           4 :                 updates = nupdates;
    1352           4 :                 if (!updates) {
    1353           0 :                         if (otids != tids)
    1354           0 :                                 bat_destroy(tids);
    1355           0 :                         return LOG_ERR;
    1356             :                 }
    1357             :         }
    1358             : 
    1359             :         /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1360             :         /* currently only one update delta is possible */
    1361        3009 :         lock_table(tr->store, t->base.id);
    1362        3009 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1363        3009 :         if (!is_new && !cs->cleared) {
    1364        2693 :                 if (!tids->tsorted /* make sure we have simple dense or oids */) {
    1365           0 :                         BAT *sorted, *order;
    1366           0 :                         if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
    1367           0 :                                 if (otids != tids)
    1368           0 :                                         bat_destroy(tids);
    1369           0 :                                 if (oupdates != updates)
    1370           0 :                                         bat_destroy(updates);
    1371           0 :                                 unlock_table(tr->store, t->base.id);
    1372           0 :                                 return LOG_ERR;
    1373             :                         }
    1374           0 :                         if (otids != tids)
    1375           0 :                                 bat_destroy(tids);
    1376           0 :                         tids = sorted;
    1377           0 :                         BAT *nupdates = BATproject(order, updates);
    1378           0 :                         bat_destroy(order);
    1379           0 :                         if (oupdates != updates)
    1380           0 :                                 bat_destroy(updates);
    1381           0 :                         updates = nupdates;
    1382           0 :                         if (!updates) {
    1383           0 :                                 bat_destroy(tids);
    1384           0 :                                 unlock_table(tr->store, t->base.id);
    1385           0 :                                 return LOG_ERR;
    1386             :                         }
    1387             :                 }
    1388        2693 :                 assert(tids->tsorted);
    1389        2693 :                 BAT *ui = NULL, *uv = NULL;
    1390             : 
    1391             :                 /* handle updates on just inserted bits */
    1392             :                 /* handle updates on updates (within one transaction) */
    1393        2693 :                 BATiter upi = bat_iterator(updates);
    1394        2693 :                 BUN cnt = 0, ucnt = BATcount(tids);
    1395        2693 :                 BAT *b, *ins = NULL;
    1396        2693 :                 int *msk = NULL;
    1397             : 
    1398        2693 :                 if((b = temp_descriptor(cs->bid)) == NULL)
    1399             :                         res = LOG_ERR;
    1400             : 
    1401        2693 :                 if (res == LOG_OK && BATtdense(tids)) {
    1402        2464 :                         oid start = tids->tseqbase, offset = start;
    1403        2464 :                         oid end = start + ucnt;
    1404             : 
    1405        8302 :                         for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
    1406        6460 :                                 if (seg->start <= start && seg->end > start) {
    1407             :                                         /* check for delete conflicts */
    1408        2464 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1409           0 :                                                 res = LOG_CONFLICT;
    1410           0 :                                                 continue;
    1411             :                                         }
    1412             : 
    1413             :                                         /* check for inplace updates */
    1414        2464 :                                         BUN lend = end < seg->end?end:seg->end;
    1415        2464 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1416         133 :                                                 if (!ins) {
    1417         133 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1418         133 :                                                         if (!ins)
    1419             :                                                                 res = LOG_ERR;
    1420             :                                                         else {
    1421         133 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1422         133 :                                                                 msk = (int*)Tloc(ins, 0);
    1423         133 :                                                                 BUN end = (ucnt+31)/32;
    1424         133 :                                                                 memset(msk, 0, end * sizeof(int));
    1425             :                                                         }
    1426             :                                                 }
    1427         726 :                                                 for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
    1428         593 :                                                         const void *upd = BUNtail(upi, rid-offset);
    1429         593 :                                                         if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1430           0 :                                                                 res = LOG_ERR;
    1431             : 
    1432         593 :                                                         oid word = i/32;
    1433         593 :                                                         int pos = i%32;
    1434         593 :                                                         msk[word] |= 1U<<pos;
    1435         593 :                                                         cnt++;
    1436             :                                                 }
    1437             :                                         }
    1438             :                                 }
    1439        6460 :                                 if (end < seg->end)
    1440             :                                         break;
    1441             :                         }
    1442         232 :                 } else if (res == LOG_OK && complex_cand(tids)) {
    1443           3 :                         struct canditer ci;
    1444           3 :                         segment *seg = s->segs->h;
    1445           3 :                         canditer_init(&ci, NULL, tids);
    1446           3 :                         BUN i = 0;
    1447        1036 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1448        1033 :                                 oid rid = canditer_next(&ci);
    1449        1033 :                                 if (seg->end <= rid)
    1450          13 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1451        1020 :                                 else if (seg->start <= rid && seg->end > rid) {
    1452             :                                         /* check for delete conflicts */
    1453        1020 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1454           0 :                                                 res = LOG_CONFLICT;
    1455           0 :                                                 continue;
    1456             :                                         }
    1457             : 
    1458             :                                         /* check for inplace updates */
    1459        1020 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1460           0 :                                                 if (!ins) {
    1461           0 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1462           0 :                                                         if (!ins) {
    1463             :                                                                 res = LOG_ERR;
    1464             :                                                                 break;
    1465             :                                                         } else {
    1466           0 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1467           0 :                                                                 msk = (int*)Tloc(ins, 0);
    1468           0 :                                                                 BUN end = (ucnt+31)/32;
    1469           0 :                                                                 memset(msk, 0, end * sizeof(int));
    1470             :                                                         }
    1471             :                                                 }
    1472           0 :                                                 ptr upd = BUNtail(upi, i);
    1473           0 :                                                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
    1474           0 :                                                         res = LOG_ERR;
    1475             : 
    1476           0 :                                                 oid word = i/32;
    1477           0 :                                                 int pos = i%32;
    1478           0 :                                                 msk[word] |= 1U<<pos;
    1479           0 :                                                 cnt++;
    1480             :                                         }
    1481        1020 :                                         i++;
    1482             :                                 }
    1483             :                         }
    1484         226 :                 } else if (res == LOG_OK) {
    1485         226 :                         BUN i = 0;
    1486         226 :                         oid *rid = Tloc(tids,0);
    1487         226 :                         segment *seg = s->segs->h;
    1488       28077 :                         while ( seg && res == LOG_OK && i < ucnt) {
    1489       27851 :                                 if (seg->end <= rid[i])
    1490        8113 :                                         seg = ATOMIC_PTR_GET(&seg->next);
    1491       19738 :                                 else if (seg->start <= rid[i] && seg->end > rid[i]) {
    1492             :                                         /* check for delete conflicts */
    1493       19738 :                                         if (seg->ts >= tr->ts && seg->deleted) {
    1494           0 :                                                 res = LOG_CONFLICT;
    1495           0 :                                                 continue;
    1496             :                                         }
    1497             : 
    1498             :                                         /* check for inplace updates */
    1499       19738 :                                         if (seg->ts == tr->tid && !seg->deleted) {
    1500         417 :                                                 if (!ins) {
    1501          47 :                                                         ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
    1502          47 :                                                         if (!ins) {
    1503             :                                                                 res = LOG_ERR;
    1504             :                                                                 break;
    1505             :                                                         } else {
    1506          47 :                                                                 BATsetcount(ins, ucnt); /* all full updates  */
    1507          47 :                                                                 msk = (int*)Tloc(ins, 0);
    1508          47 :                                                                 BUN end = (ucnt+31)/32;
    1509          47 :                                                                 memset(msk, 0, end * sizeof(int));
    1510             :                                                         }
    1511             :                                                 }
    1512         417 :                                                 const void *upd = BUNtail(upi, i);
    1513         417 :                                                 if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
    1514           0 :                                                         res = LOG_ERR;
    1515             : 
    1516         417 :                                                 oid word = i/32;
    1517         417 :                                                 int pos = i%32;
    1518         417 :                                                 msk[word] |= 1U<<pos;
    1519         417 :                                                 cnt++;
    1520             :                                         }
    1521       19738 :                                         i++;
    1522             :                                 }
    1523             :                         }
    1524             :                 }
    1525             : 
    1526        2693 :                 if (res == LOG_OK && cnt < ucnt) {   /* now handle real updates */
    1527        2513 :                         if (cs->ucnt == 0) {
    1528        2407 :                                 if (cnt) {
    1529           0 :                                         BAT *nins = BATmaskedcands(0, ucnt, ins, false);
    1530           0 :                                         if (nins) {
    1531           0 :                                                 ui = BATproject(nins, tids);
    1532           0 :                                                 uv = BATproject(nins, updates);
    1533           0 :                                                 bat_destroy(nins);
    1534             :                                         }
    1535             :                                 } else {
    1536        2407 :                                         ui = temp_descriptor(tids->batCacheid);
    1537        2407 :                                         uv = temp_descriptor(updates->batCacheid);
    1538             :                                 }
    1539        2407 :                                 if (!ui || !uv) {
    1540             :                                         res = LOG_ERR;
    1541             :                                 } else {
    1542        2407 :                                         temp_destroy(cs->uibid);
    1543        2407 :                                         temp_destroy(cs->uvbid);
    1544        2407 :                                         ui = transfer_to_systrans(ui);
    1545        2407 :                                         uv = transfer_to_systrans(uv);
    1546        2407 :                                         if (ui == NULL || uv == NULL) {
    1547           0 :                                                 BBPreclaim(ui);
    1548           0 :                                                 BBPreclaim(uv);
    1549             :                                                 res = LOG_ERR;
    1550             :                                         } else {
    1551        2407 :                                                 cs->uibid = temp_create(ui);
    1552        2407 :                                                 cs->uvbid = temp_create(uv);
    1553        2407 :                                                 cs->ucnt = BATcount(ui);
    1554             :                                         }
    1555             :                                 }
    1556             :                         } else {
    1557         106 :                                 BAT *nui = NULL, *nuv = NULL;
    1558             : 
    1559             :                                 /* merge taking msk of inserted into account */
    1560         106 :                                 if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
    1561             :                                         res = LOG_ERR;
    1562             : 
    1563         106 :                                 if (res == LOG_OK) {
    1564         106 :                                         const void *upd = NULL;
    1565         106 :                                         nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
    1566         106 :                                         nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
    1567             : 
    1568         106 :                                         if (!nui || !nuv) {
    1569             :                                                 res = LOG_ERR;
    1570             :                                         } else {
    1571         106 :                                                 BATiter ovi = bat_iterator(uv);
    1572             : 
    1573             :                                                 /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
    1574         106 :                                                 BUN uip = 0, uie = BATcount(ui);
    1575         106 :                                                 BUN nip = 0, nie = BATcount(tids);
    1576         106 :                                                 oid uiseqb = ui->tseqbase;
    1577         106 :                                                 oid niseqb = tids->tseqbase;
    1578         106 :                                                 oid *uipt = NULL, *nipt = NULL;
    1579         106 :                                                 BATiter uii = bat_iterator(ui);
    1580         106 :                                                 BATiter tidsi = bat_iterator(tids);
    1581         106 :                                                 if (!BATtdensebi(&uii))
    1582         105 :                                                         uipt = uii.base;
    1583         106 :                                                 if (!BATtdensebi(&tidsi))
    1584         105 :                                                         nipt = tidsi.base;
    1585       26789 :                                                 while (uip < uie && nip < nie && res == LOG_OK) {
    1586       26683 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1587       26683 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1588             : 
    1589       26683 :                                                         if (uiv < niv) {
    1590       15068 :                                                                 upd = BUNtail(ovi, uip);
    1591       30136 :                                                                 if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1592       15068 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1593             :                                                                         res = LOG_ERR;
    1594       15068 :                                                                 uip++;
    1595       11615 :                                                         } else if (uiv == niv) {
    1596             :                                                                 /* handle == */
    1597          18 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1598          18 :                                                                         upd = BUNtail(upi, nip);
    1599          36 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1600          18 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1601             :                                                                                 res = LOG_ERR;
    1602             :                                                                 } else {
    1603           0 :                                                                         upd = BUNtail(ovi, uip);
    1604           0 :                                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1605           0 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1606             :                                                                                 res = LOG_ERR;
    1607             :                                                                 }
    1608          18 :                                                                 uip++;
    1609          18 :                                                                 nip++;
    1610             :                                                         } else { /* uiv > niv */
    1611       11597 :                                                                 if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1612       11597 :                                                                         upd = BUNtail(upi, nip);
    1613       23194 :                                                                         if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1614       11597 :                                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1615             :                                                                                 res = LOG_ERR;
    1616             :                                                                 }
    1617       11597 :                                                                 nip++;
    1618             :                                                         }
    1619             :                                                 }
    1620         542 :                                                 while (uip < uie && res == LOG_OK) {
    1621         436 :                                                         oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
    1622         436 :                                                         upd = BUNtail(ovi, uip);
    1623         872 :                                                         if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
    1624         436 :                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1625             :                                                                 res = LOG_ERR;
    1626         436 :                                                         uip++;
    1627             :                                                 }
    1628         799 :                                                 while (nip < nie && res == LOG_OK) {
    1629         693 :                                                         oid niv = (nipt)?nipt[nip]: niseqb+nip;
    1630         693 :                                                         if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
    1631         693 :                                                                 upd = BUNtail(upi, nip);
    1632        1386 :                                                                 if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
    1633         693 :                                                                                         BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
    1634             :                                                                         res = LOG_ERR;
    1635             :                                                         }
    1636         693 :                                                         nip++;
    1637             :                                                 }
    1638         106 :                                                 bat_iterator_end(&uii);
    1639         106 :                                                 bat_iterator_end(&tidsi);
    1640         106 :                                                 bat_iterator_end(&ovi);
    1641         106 :                                                 if (res == LOG_OK) {
    1642         106 :                                                         temp_destroy(cs->uibid);
    1643         106 :                                                         temp_destroy(cs->uvbid);
    1644         106 :                                                         nui = transfer_to_systrans(nui);
    1645         106 :                                                         nuv = transfer_to_systrans(nuv);
    1646         106 :                                                         if (nui == NULL || nuv == NULL) {
    1647             :                                                                 res = LOG_ERR;
    1648             :                                                         } else {
    1649         106 :                                                                 cs->uibid = temp_create(nui);
    1650         106 :                                                                 cs->uvbid = temp_create(nuv);
    1651         106 :                                                                 cs->ucnt = BATcount(nui);
    1652             :                                                         }
    1653             :                                                 }
    1654             :                                         }
    1655         106 :                                         bat_destroy(nui);
    1656         106 :                                         bat_destroy(nuv);
    1657             :                                 }
    1658             :                         }
    1659             :                 }
    1660        2693 :                 bat_iterator_end(&upi);
    1661        2693 :                 bat_destroy(b);
    1662        2693 :                 unlock_table(tr->store, t->base.id);
    1663        2693 :                 bat_destroy(ins);
    1664        2693 :                 bat_destroy(ui);
    1665        2693 :                 bat_destroy(uv);
    1666        2693 :                 if (otids != tids)
    1667           4 :                         bat_destroy(tids);
    1668        2693 :                 if (oupdates != updates)
    1669           5 :                         bat_destroy(updates);
    1670        2693 :                 return res;
    1671             :         } else if (is_new || cs->cleared) {
    1672         316 :                 BAT *b = temp_descriptor(cs->bid);
    1673             : 
    1674         316 :                 if (b == NULL) {
    1675             :                         res = LOG_ERR;
    1676             :                 } else {
    1677         316 :                         if (BATcount(b)==0) {
    1678           1 :                                 if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
    1679           0 :                                         res = LOG_ERR;
    1680         315 :                         } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
    1681           0 :                                 res = LOG_ERR;
    1682         316 :                         BBPcold(b->batCacheid);
    1683         316 :                         bat_destroy(b);
    1684             :                 }
    1685             :         }
    1686         316 :         unlock_table(tr->store, t->base.id);
    1687         316 :         if (otids != tids)
    1688           2 :                 bat_destroy(tids);
    1689         316 :         if (oupdates != updates)
    1690          41 :                 bat_destroy(updates);
    1691             :         return res;
    1692             : }
    1693             : 
    1694             : static int
    1695        3009 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
    1696             : {
    1697        3009 :         return cs_update_bat(tr, bat, t, tids, updates, is_new);
    1698             : }
    1699             : 
    1700             : static void *
    1701           4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
    1702             : {
    1703           4 :         void *newoffsets = NULL;
    1704           4 :         sql_delta *bat = *batp;
    1705           4 :         column_storage *cs = &bat->cs;
    1706           4 :         BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
    1707             : 
    1708           4 :         if (!u)
    1709             :                 return NULL;
    1710           4 :         BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
    1711           4 :         if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
    1712           0 :                 bat_destroy(u);
    1713           0 :                 return NULL;
    1714             :         } else {
    1715           4 :                 int new = 0;
    1716             :                 /* returns new offset bat (ie to be appended), possibly with larger type ! */
    1717           4 :                 if (BATcount(u) >= max_cnt) {
    1718           0 :                         if (max_cnt == 64*1024) { /* decompress */
    1719           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1720           0 :                                         bat_destroy(u);
    1721           0 :                                         return NULL;
    1722             :                                 }
    1723           0 :                                 n = DICTdecompress_(b, u, PERSISTENT);
    1724             :                                 /* TODO decompress updates if any */
    1725           0 :                                 bat_destroy(b);
    1726           0 :                                 assert(newoffsets == NULL);
    1727           0 :                                 if (!n) {
    1728           0 :                                         bat_destroy(u);
    1729           0 :                                         return NULL;
    1730             :                                 }
    1731           0 :                                 if (cs->ts != tr->tid) {
    1732           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1733           0 :                                                 bat_destroy(n);
    1734           0 :                                                 bat_destroy(u);
    1735           0 :                                                 return NULL;
    1736             :                                         }
    1737           0 :                                         cs = &(*batp)->cs;
    1738           0 :                                         new = 1;
    1739           0 :                                         cs->uibid = cs->uvbid = 0;
    1740             :                                 }
    1741           0 :                                 if (cs->bid && !new)
    1742           0 :                                         temp_destroy(cs->bid);
    1743           0 :                                 n = transfer_to_systrans(n);
    1744           0 :                                 if (n == NULL) {
    1745           0 :                                         bat_destroy(u);
    1746           0 :                                         return NULL;
    1747             :                                 }
    1748           0 :                                 bat_set_access(n, BAT_READ);
    1749           0 :                                 cs->bid = temp_create(n);
    1750           0 :                                 bat_destroy(n);
    1751           0 :                                 if (cs->ebid && !new)
    1752           0 :                                         temp_destroy(cs->ebid);
    1753           0 :                                 cs->ebid = 0;
    1754           0 :                                 cs->st = ST_DEFAULT;
    1755             :                                 /* at append_col the column's storage type is cleared */
    1756           0 :                                 cs->cleared = true;
    1757             :                         } else {
    1758           0 :                                 if (!(b = temp_descriptor(cs->bid))) {
    1759           0 :                                         GDKfree(newoffsets);
    1760           0 :                                         bat_destroy(u);
    1761           0 :                                         return NULL;
    1762             :                                 }
    1763           0 :                                 n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, PERSISTENT);
    1764           0 :                                 bat_destroy(b);
    1765           0 :                                 if (!n) {
    1766           0 :                                         GDKfree(newoffsets);
    1767           0 :                                         bat_destroy(u);
    1768           0 :                                         return NULL;
    1769             :                                 }
    1770           0 :                                 if (cs->ts != tr->tid) {
    1771           0 :                                         if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
    1772           0 :                                                 bat_destroy(u);
    1773           0 :                                                 bat_destroy(n);
    1774           0 :                                                 return NULL;
    1775             :                                         }
    1776           0 :                                         cs = &(*batp)->cs;
    1777           0 :                                         new = 1;
    1778           0 :                                         temp_dup(cs->ebid);
    1779           0 :                                         if (cs->uibid) {
    1780           0 :                                                 temp_dup(cs->uibid);
    1781           0 :                                                 temp_dup(cs->uvbid);
    1782             :                                         }
    1783             :                                 }
    1784           0 :                                 if (cs->bid)
    1785           0 :                                         temp_destroy(cs->bid);
    1786           0 :                                 n = transfer_to_systrans(n);
    1787           0 :                                 if (n == NULL) {
    1788           0 :                                         bat_destroy(u);
    1789           0 :                                         return NULL;
    1790             :                                 }
    1791           0 :                                 bat_set_access(n, BAT_READ);
    1792           0 :                                 cs->bid = temp_create(n);
    1793           0 :                                 bat_destroy(n);
    1794           0 :                                 cs->cleared = true;
    1795           0 :                                 i = newoffsets;
    1796             :                         }
    1797             :                 } else { /* append */
    1798           4 :                         i = newoffsets;
    1799             :                 }
    1800             :         }
    1801           4 :         bat_destroy(u);
    1802           4 :         return i;
    1803             : }
    1804             : 
    1805             : static void *
    1806           1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
    1807             : {
    1808           1 :         lng offsetval = strtoll(storage_type+4, NULL, 10);
    1809           1 :         void *newoffsets = NULL;
    1810           1 :         BAT *b = NULL, *n = NULL;
    1811             : 
    1812           1 :         if (!(b = temp_descriptor(cs->bid)))
    1813             :                 return NULL;
    1814             : 
    1815           1 :         if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
    1816           0 :                 bat_destroy(b);
    1817           0 :                 return NULL;
    1818             :         } else {
    1819             :                 /* returns new offset bat if values within min/max, else decompress */
    1820           1 :                 if (!newoffsets) {
    1821           1 :                         n = FORdecompress_(b, offsetval, tt, PERSISTENT);
    1822           1 :                         bat_destroy(b);
    1823           1 :                         if (!n)
    1824             :                                 return NULL;
    1825             :                         /* TODO decompress updates if any */
    1826           1 :                         if (cs->bid)
    1827           1 :                                 temp_destroy(cs->bid);
    1828           1 :                         n = transfer_to_systrans(n);
    1829           1 :                         if (n == NULL)
    1830             :                                 return NULL;
    1831           1 :                         bat_set_access(n, BAT_READ);
    1832           1 :                         cs->bid = temp_create(n);
    1833           1 :                         cs->st = ST_DEFAULT;
    1834             :                         /* at append_col the column's storage type is cleared */
    1835           1 :                         cs->cleared = true;
    1836           1 :                         b = n;
    1837             :                 } else { /* append */
    1838             :                         i = newoffsets;
    1839             :                 }
    1840             :         }
    1841           1 :         bat_destroy(b);
    1842           1 :         return i;
    1843             : }
    1844             : 
    1845             : static int
    1846        6346 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
    1847             : {
    1848        6346 :         void *oupd = upd;
    1849        6346 :         sql_delta *bat = *batp;
    1850        6346 :         column_storage *cs = &bat->cs;
    1851        6346 :         storage *s = ATOMIC_PTR_GET(&t->data);
    1852        6346 :         assert(!is_oid_nil(rid));
    1853        6346 :         int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
    1854             : 
    1855        6346 :         if (cs->st == ST_DICT) {
    1856             :                 /* possibly a new array is returned */
    1857           0 :                 upd = dict_append_val(tr, batp, upd, 1);
    1858           0 :                 bat = *batp;
    1859           0 :                 cs = &bat->cs;
    1860           0 :                 if (!upd)
    1861             :                         return LOG_ERR;
    1862             :         }
    1863             : 
    1864             :         /* check if rid is insert ? */
    1865        6346 :         if (!inplace) {
    1866             :                 /* check conflict */
    1867        3782 :                 if (segments_is_deleted(s->segs->h, tr, rid)) {
    1868           0 :                         if (oupd != upd)
    1869           0 :                                 GDKfree(upd);
    1870           0 :                         return LOG_CONFLICT;
    1871             :                 }
    1872        3782 :                 BAT *ui, *uv;
    1873             : 
    1874             :                 /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
    1875             :                 /* currently only one update delta is possible */
    1876        3782 :                 if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
    1877           0 :                         if (oupd != upd)
    1878           0 :                                 GDKfree(upd);
    1879           0 :                         return LOG_ERR;
    1880             :                 }
    1881             : 
    1882        3782 :                 assert(uv->ttype);
    1883        3782 :                 assert(BATcount(ui) == BATcount(uv));
    1884        7564 :                 if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
    1885        3782 :                     BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
    1886           0 :                         if (oupd != upd)
    1887           0 :                                 GDKfree(upd);
    1888           0 :                         bat_destroy(ui);
    1889           0 :                         bat_destroy(uv);
    1890           0 :                         return LOG_ERR;
    1891             :                 }
    1892        3782 :                 assert(BATcount(ui) == BATcount(uv));
    1893        3782 :                 bat_destroy(ui);
    1894        3782 :                 bat_destroy(uv);
    1895        3782 :                 cs->ucnt++;
    1896             :         } else {
    1897        2564 :                 BAT *b = NULL;
    1898             : 
    1899        2564 :                 if((b = temp_descriptor(cs->bid)) == NULL) {
    1900           0 :                         if (oupd != upd)
    1901           0 :                                 GDKfree(upd);
    1902           0 :                         return LOG_ERR;
    1903             :                 }
    1904        2564 :                 if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
    1905           0 :                         if (oupd != upd)
    1906           0 :                                 GDKfree(upd);
    1907           0 :                         bat_destroy(b);
    1908           0 :                         return LOG_ERR;
    1909             :                 }
    1910        2564 :                 bat_destroy(b);
    1911             :         }
    1912        6346 :         if (oupd != upd)
    1913           0 :                 GDKfree(upd);
    1914             :         return LOG_OK;
    1915             : }
    1916             : 
    1917             : static int
    1918        6346 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
    1919             : {
    1920        6346 :         int res = LOG_OK;
    1921        6346 :         lock_table(tr->store, t->base.id);
    1922        6346 :         res = cs_update_val(tr, bat, t, rid, upd, is_new);
    1923        6346 :         unlock_table(tr->store, t->base.id);
    1924        6346 :         return res;
    1925             : }
    1926             : 
    1927             : static int
    1928      159070 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
    1929             : {
    1930      159070 :         (void)tr;
    1931      159070 :         if (!ocs)
    1932             :                 return LOG_OK;
    1933      159070 :         cs->bid = ocs->bid;
    1934      159070 :         cs->ebid = ocs->ebid;
    1935      159070 :         cs->uibid = ocs->uibid;
    1936      159070 :         cs->uvbid = ocs->uvbid;
    1937      159070 :         cs->ucnt = ocs->ucnt;
    1938             : 
    1939      159070 :         if (temp) {
    1940       26017 :                 cs->bid = temp_copy(cs->bid, true, false);
    1941       25991 :                 if (cs->bid == BID_NIL)
    1942             :                         return LOG_ERR;
    1943             :         } else {
    1944      133053 :                 temp_dup(cs->bid);
    1945             :         }
    1946      159051 :         if (cs->ebid)
    1947           6 :                 temp_dup(cs->ebid);
    1948      159051 :         cs->ucnt = 0;
    1949      159051 :         cs->uibid = e_bat(TYPE_oid);
    1950      159101 :         cs->uvbid = e_bat(type);
    1951      159103 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    1952             :                 return LOG_ERR;
    1953      159103 :         cs->st = ocs->st;
    1954      159103 :         return LOG_OK;
    1955             : }
    1956             : 
    1957             : static void
    1958      308588 : destroy_delta(sql_delta *b, bool recursive)
    1959             : {
    1960      308588 :         if (ATOMIC_DEC(&b->cs.refcnt) > 0)
    1961             :                 return;
    1962      290052 :         if (recursive && b->next)
    1963       29848 :                 destroy_delta(b->next, true);
    1964      290052 :         if (b->cs.uibid)
    1965       94695 :                 temp_destroy(b->cs.uibid);
    1966      290052 :         if (b->cs.uvbid)
    1967       94695 :                 temp_destroy(b->cs.uvbid);
    1968      290052 :         if (b->cs.bid)
    1969      290052 :                 temp_destroy(b->cs.bid);
    1970      290052 :         if (b->cs.ebid)
    1971          61 :                 temp_destroy(b->cs.ebid);
    1972      290052 :         b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
    1973      290052 :         _DELETE(b);
    1974             : }
    1975             : 
    1976             : static sql_delta *
    1977    15297642 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
    1978             : {
    1979    15297642 :         sql_delta *obat = ATOMIC_PTR_GET(&c->data);
    1980             : 
    1981    15297642 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    1982    15164642 :                 return obat;
    1983      133000 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    1984             :                 /* abort */
    1985          12 :                 if (update_conflict)
    1986           4 :                         *update_conflict = true;
    1987           8 :                 else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
    1988           8 :                         return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
    1989           4 :                 return NULL;
    1990             :         }
    1991      132988 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
    1992             :                 return NULL;
    1993      132991 :         sql_delta* bat = ZNEW(sql_delta);
    1994      133036 :         if (!bat)
    1995             :                 return NULL;
    1996      133036 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    1997      133036 :         if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
    1998           0 :                 destroy_delta(bat, false);
    1999           0 :                 return NULL;
    2000             :         }
    2001      133049 :         bat->cs.ts = tr->tid;
    2002             :         /* only one writer else abort */
    2003      133049 :         bat->next = obat;
    2004      133049 :         if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
    2005           0 :                 bat->next = NULL;
    2006           0 :                 destroy_delta(bat, false);
    2007           0 :                 if (update_conflict)
    2008           0 :                         *update_conflict = true;
    2009           0 :                 return NULL;
    2010             :         }
    2011             :         return bat;
    2012             : }
    2013             : 
    2014             : static int
    2015        9355 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
    2016             : {
    2017        9355 :         int ok = LOG_OK;
    2018             : 
    2019        9355 :         if (is_bat) {
    2020        3009 :                 BAT *tids = incoming_tids;
    2021        3009 :                 BAT *values = incoming_values;
    2022        3009 :                 if (BATcount(tids) == 0)
    2023             :                         return LOG_OK;
    2024        3009 :                 ok = delta_update_bat(tr, delta, table, tids, values, is_new);
    2025             :         } else {
    2026        6346 :                 ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
    2027             :         }
    2028             :         return ok;
    2029             : }
    2030             : 
    2031             : static int
    2032        9597 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
    2033             : {
    2034        9597 :         int res = LOG_OK;
    2035        9597 :         bool update_conflict = false;
    2036        9597 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2037             : 
    2038        9597 :         if (isbat) {
    2039        3248 :                 BAT *t = tids;
    2040        3248 :                 if (!BATcount(t))
    2041             :                         return LOG_OK;
    2042             :         }
    2043             : 
    2044        9082 :         if (c == NULL)
    2045             :                 return LOG_ERR;
    2046             : 
    2047        9082 :         if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
    2048           4 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2049             : 
    2050        9078 :         assert(delta && delta->cs.ts == tr->tid);
    2051        9078 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    2052        9078 :         if (odelta != delta)
    2053        3356 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    2054             : 
    2055        9078 :         odelta = delta;
    2056        9078 :         if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
    2057             :                 return res;
    2058        9078 :         assert(delta == odelta);
    2059        9078 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2060           0 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2061             :         return res;
    2062             : }
    2063             : 
    2064             : static sql_delta *
    2065        2446 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
    2066             : {
    2067        2446 :         sql_delta *obat = ATOMIC_PTR_GET(&i->data);
    2068             : 
    2069        2446 :         if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
    2070        2418 :                 return obat;
    2071          28 :         if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
    2072             :                 /* abort */
    2073           0 :                 if (update_conflict)
    2074           0 :                         *update_conflict = true;
    2075           0 :                 return NULL;
    2076             :         }
    2077          28 :         if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
    2078             :                 return NULL;
    2079          28 :         sql_delta* bat = ZNEW(sql_delta);
    2080          28 :         if (!bat)
    2081             :                 return NULL;
    2082          28 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2083          33 :         if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
    2084           0 :                 destroy_delta(bat, false);
    2085           0 :                 return NULL;
    2086             :         }
    2087          28 :         bat->cs.ts = tr->tid;
    2088             :         /* only one writer else abort */
    2089          28 :         bat->next = obat;
    2090          28 :         if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
    2091           0 :                 bat->next = NULL;
    2092           0 :                 destroy_delta(bat, false);
    2093           0 :                 if (update_conflict)
    2094           0 :                         *update_conflict = true;
    2095           0 :                 return NULL;
    2096             :         }
    2097             :         return bat;
    2098             : }
    2099             : 
    2100             : static int
    2101         776 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
    2102             : {
    2103         776 :         int res = LOG_OK;
    2104         776 :         bool update_conflict = false;
    2105         776 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    2106             : 
    2107         776 :         if (isbat) {
    2108         776 :                 BAT *t = tids;
    2109         776 :                 if (!BATcount(t))
    2110             :                         return LOG_OK;
    2111             :         }
    2112             : 
    2113         277 :         if (i == NULL)
    2114             :                 return LOG_ERR;
    2115             : 
    2116         277 :         if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
    2117           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    2118             : 
    2119         277 :         assert(delta && delta->cs.ts == tr->tid);
    2120         277 :         if (odelta != delta)
    2121          22 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    2122             : 
    2123         277 :         odelta = delta;
    2124         277 :         res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
    2125         277 :         assert(delta == odelta);
    2126             :         return res;
    2127             : }
    2128             : 
    2129             : static int
    2130      148081 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type, bool istemp)
    2131             : {
    2132      148081 :         BAT *b, *oi = i;
    2133      148081 :         int err = 0;
    2134      148081 :         sql_delta *bat = *batp;
    2135             : 
    2136      148081 :         assert(!offsets || BATcount(offsets) == BATcount(i));
    2137      148081 :         if (!BATcount(i))
    2138             :                 return LOG_OK;
    2139      148081 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
    2140             :                 return LOG_ERR;
    2141             : 
    2142      148081 :         lock_column(tr->store, id);
    2143      149107 :         if (bat->cs.st == ST_DICT) {
    2144          13 :                 BAT *ni = dict_append_bat(tr, batp, oi);
    2145          13 :                 bat = *batp;
    2146          13 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2147           0 :                         bat_destroy(oi);
    2148          13 :                 oi = ni;
    2149          13 :                 if (!oi) {
    2150           0 :                         unlock_column(tr->store, id);
    2151           0 :                         return LOG_ERR;
    2152             :                 }
    2153             :         }
    2154      149107 :         if (bat->cs.st == ST_FOR) {
    2155           0 :                 BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
    2156           0 :                 bat = *batp;
    2157           0 :                 if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
    2158           0 :                         bat_destroy(oi);
    2159           0 :                 oi = ni;
    2160           0 :                 if (!oi) {
    2161           0 :                         unlock_column(tr->store, id);
    2162           0 :                         return LOG_ERR;
    2163             :                 }
    2164             :         }
    2165             : 
    2166      149107 :         b = temp_descriptor(bat->cs.bid);
    2167      149069 :         if (b == NULL) {
    2168           0 :                 unlock_column(tr->store, id);
    2169           0 :                 if (oi != i)
    2170           0 :                         bat_destroy(oi);
    2171           0 :                 return LOG_ERR;
    2172             :         }
    2173      149069 :         if (istemp && !offsets && offset == 0 && BATcount(b) == 0 && bat->cs.ucnt == 0) {
    2174          14 :                 bat_set_access(i, BAT_READ);
    2175          14 :                 if (bat->cs.bid)
    2176          14 :                         temp_destroy(bat->cs.bid);
    2177          14 :                 i = transfer_to_systrans(i);
    2178          14 :                 bat->cs.bid = temp_create(i);
    2179      149055 :         } else if (!offsets && offset == b->hseqbase+BATcount(b)) {
    2180      148867 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2181         263 :                         err = 1;
    2182         176 :         } else if (!offsets) {
    2183         176 :                 if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
    2184         263 :                         err = 1;
    2185          12 :         } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
    2186           0 :                 if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
    2187         263 :                         err = 1;
    2188          12 :         } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
    2189           0 :                         err = 1;
    2190             :         }
    2191      148810 :         bat_destroy(b);
    2192      148529 :         unlock_column(tr->store, id);
    2193             : 
    2194      149659 :         if (oi != i)
    2195          13 :                 bat_destroy(oi);
    2196      149659 :         return (err)?LOG_ERR:LOG_OK;
    2197             : }
    2198             : 
    2199             : // Look at the offsets and find where the replacements end and the appends begin.
    2200             : static BUN
    2201           0 : start_of_appends(BAT *offsets, BUN bcnt)
    2202             : {
    2203           0 :         BUN ocnt = BATcount(offsets);
    2204           0 :         if (ocnt == 0)
    2205             :                 return 0;
    2206             : 
    2207           0 :         BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
    2208           0 :         if (highest < bcnt)
    2209             :                 // all are replacements
    2210             :                 return ocnt;
    2211             : 
    2212             :         // reason backward to find the first append.
    2213             :         // Suppose offsets has 15 entries, bcnt == 100
    2214             :         // and the highest offset in offsets is 109.
    2215           0 :         BUN new_bcnt = highest + 1; // 110
    2216           0 :         BUN nappends = new_bcnt - bcnt; // 10
    2217           0 :         BUN nreplacements = ocnt - nappends; // 5
    2218             : 
    2219             :         // The first append should be to position bcnt
    2220           0 :         assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
    2221             : 
    2222             :         return nreplacements;
    2223             : }
    2224             : 
    2225             : 
    2226             : static int
    2227    14999524 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
    2228             : {
    2229    14999524 :         void *oi = i;
    2230    14999524 :         BAT *b;
    2231    14999524 :         lock_column(tr->store, id);
    2232    15027331 :         sql_delta *bat = *batp;
    2233             : 
    2234    15027331 :         if (bat->cs.st == ST_DICT) {
    2235             :                 /* possibly a new array is returned */
    2236           4 :                 i = dict_append_val(tr, batp, i, cnt);
    2237           4 :                 bat = *batp;
    2238           4 :                 if (!i) {
    2239           0 :                         unlock_column(tr->store, id);
    2240           0 :                         return LOG_ERR;
    2241             :                 }
    2242             :         }
    2243    15027331 :         if (bat->cs.st == ST_FOR) {
    2244             :                 /* possibly a new array is returned */
    2245           1 :                 i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
    2246           1 :                 bat = *batp;
    2247           1 :                 if (!i) {
    2248           0 :                         unlock_column(tr->store, id);
    2249           0 :                         return LOG_ERR;
    2250             :                 }
    2251             :         }
    2252             : 
    2253    15027331 :         b = temp_descriptor(bat->cs.bid);
    2254    15026375 :         if (b == NULL) {
    2255           0 :                 if (i != oi)
    2256           0 :                         GDKfree(i);
    2257           0 :                 unlock_column(tr->store, id);
    2258           0 :                 return LOG_ERR;
    2259             :         }
    2260    15026375 :         BUN bcnt = BATcount(b);
    2261             : 
    2262    15026375 :         if (offsets) {
    2263             :                 // The first few might be replacements while later items might be appends.
    2264             :                 // Handle the replacements here while leaving the appends to the code below.
    2265           0 :                 BUN nreplacements = start_of_appends(offsets, bcnt);
    2266             : 
    2267           0 :                 oid *start = Tloc(offsets, 0);
    2268           0 :                 if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
    2269           0 :                         bat_destroy(b);
    2270           0 :                         if (i != oi)
    2271           0 :                                 GDKfree(i);
    2272           0 :                         unlock_column(tr->store, id);
    2273           0 :                         return LOG_ERR;
    2274             :                 }
    2275             : 
    2276             :                 // Replacements have been handled. The rest are appends.
    2277           0 :                 assert(offset == oid_nil);
    2278           0 :                 offset = bcnt;
    2279           0 :                 cnt -= nreplacements;
    2280             :         }
    2281             : 
    2282    15026375 :         if (bcnt > offset){
    2283      530672 :                 size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
    2284      530672 :                 if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
    2285           0 :                         bat_destroy(b);
    2286           0 :                         if (i != oi)
    2287           0 :                                 GDKfree(i);
    2288           0 :                         unlock_column(tr->store, id);
    2289           0 :                         return LOG_ERR;
    2290             :                 }
    2291      530810 :                 cnt -= ccnt;
    2292      530810 :                 offset += ccnt;
    2293             :         }
    2294    15026513 :         if (cnt) {
    2295    14495337 :                 if (BATcount(b) < offset) { /* add space */
    2296        6949 :                         BUN d = offset - BATcount(b);
    2297        6949 :                         if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
    2298           0 :                                 bat_destroy(b);
    2299           0 :                                 if (i != oi)
    2300           0 :                                         GDKfree(i);
    2301           0 :                                 unlock_column(tr->store, id);
    2302           0 :                                 return LOG_ERR;
    2303             :                         }
    2304             :                 }
    2305    14495334 :                 if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
    2306           0 :                         bat_destroy(b);
    2307           0 :                         if (i != oi)
    2308           0 :                                 GDKfree(i);
    2309           0 :                         unlock_column(tr->store, id);
    2310           0 :                         return LOG_ERR;
    2311             :                 }
    2312             :         }
    2313    15020177 :         bat_destroy(b);
    2314    15013307 :         if (i != oi)
    2315           4 :                 GDKfree(i);
    2316    15013307 :         unlock_column(tr->store, id);
    2317    15013307 :         return LOG_OK;
    2318             : }
    2319             : 
    2320             : static int
    2321       26024 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
    2322             : {
    2323       26024 :         if (!(bat->segs = new_segments(tr, 0)))
    2324             :                 return LOG_ERR;
    2325       26019 :         return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
    2326             : }
    2327             : 
    2328             : static int
    2329    15146948 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type, bool isnew)
    2330             : {
    2331    15146948 :         int ok = LOG_OK;
    2332             : 
    2333    15146948 :         if ((*delta)->cs.merged)
    2334       34984 :                 (*delta)->cs.merged = false; /* TODO needs to move */
    2335    15146948 :         if (isbat) {
    2336      147831 :                 BAT *bat = incoming_data;
    2337             : 
    2338      147831 :                 if (BATcount(bat))
    2339      148023 :                         ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type, isnew);
    2340             :         } else {
    2341    14999117 :                 ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
    2342             :         }
    2343    15178790 :         return ok;
    2344             : }
    2345             : 
    2346             : static int
    2347    15159551 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2348             : {
    2349    15159551 :         int res = LOG_OK;
    2350    15159551 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    2351             : 
    2352    15159551 :         if (isbat) {
    2353      148247 :                 BAT *t = data;
    2354      148247 :                 if (!BATcount(t))
    2355             :                         return LOG_OK;
    2356             :         }
    2357             : 
    2358    15158835 :         if ((delta = bind_col_data(tr, c, NULL)) == NULL)
    2359             :                 return LOG_ERR;
    2360             : 
    2361    15158986 :         assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
    2362             : 
    2363    15158986 :         odelta = delta;
    2364    15158986 :         if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type, isTempTable(c->t))) != LOG_OK)
    2365             :                 return res;
    2366    15175812 :         if (odelta != delta) {
    2367           0 :                 delta->next = odelta;
    2368           0 :                 if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
    2369           0 :                         delta->next = NULL;
    2370           0 :                         destroy_delta(delta, false);
    2371           0 :                         return LOG_CONFLICT;
    2372             :                 }
    2373             :         }
    2374    15175812 :         if (delta->cs.st == ST_DEFAULT && c->storage_type)
    2375           1 :                 res = sql_trans_alter_storage(tr, c, NULL);
    2376             :         return res;
    2377             : }
    2378             : 
    2379             : static int
    2380        2175 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
    2381             : {
    2382        2175 :         int res = LOG_OK;
    2383        2175 :         sql_delta *delta;
    2384             : 
    2385        2175 :         if (isbat) {
    2386        1003 :                 BAT *t = data;
    2387        1003 :                 if (!BATcount(t))
    2388             :                         return LOG_OK;
    2389             :         }
    2390             : 
    2391        2163 :         if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
    2392             :                 return LOG_ERR;
    2393             : 
    2394        2163 :         assert(delta->cs.st == ST_DEFAULT);
    2395             : 
    2396        2163 :         res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL, isTempTable(i->t));
    2397        2163 :         return res;
    2398             : }
    2399             : 
    2400             : static int
    2401       73428 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
    2402             : {
    2403       73428 :         int err = 0;
    2404             : 
    2405             :         /* TODO check for conflicting updates */
    2406       73428 :         (void)rid;
    2407       73428 :         (void)cnt;
    2408      542838 :         for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
    2409      469410 :                 sql_column *c = n->data;
    2410      469410 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    2411             : 
    2412             :                 /* check for active updates */
    2413      469410 :                 if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
    2414             :                         return 1;
    2415             :         }
    2416             :         return 0;
    2417             : }
    2418             : 
    2419             : static int
    2420       70460 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
    2421             : {
    2422       70460 :         int in_transaction = segments_in_transaction(tr, t);
    2423             : 
    2424       70460 :         lock_table(tr->store, t->base.id);
    2425             :         /* find segment of rid, split, mark new segment deleted (for tr->tid) */
    2426       70460 :         segment *seg = s->segs->h, *p = NULL;
    2427    51769916 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2428    51769916 :                 if (seg->start <= rid && seg->end > rid) {
    2429       70460 :                         if (!SEG_VALID_4_DELETE(seg,tr)) {
    2430           4 :                                 unlock_table(tr->store, t->base.id);
    2431           4 :                                 return LOG_CONFLICT;
    2432             :                         }
    2433       70456 :                         if (deletes_conflict_updates( tr, t, rid, 1)) {
    2434           0 :                                 unlock_table(tr->store, t->base.id);
    2435           0 :                                 return LOG_CONFLICT;
    2436             :                         }
    2437       70456 :                         if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
    2438           0 :                                 unlock_table(tr->store, t->base.id);
    2439           0 :                                 return LOG_ERR;
    2440             :                         }
    2441             :                         break;
    2442             :                 }
    2443             :         }
    2444       70456 :         unlock_table(tr->store, t->base.id);
    2445       70456 :         if (!in_transaction)
    2446       11921 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2447             :         return LOG_OK;
    2448             : }
    2449             : 
    2450             : static int
    2451        2970 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
    2452             : {
    2453        2970 :         segment *seg = *Seg, *p = NULL;
    2454        9713 :         for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    2455        9711 :                 if (seg->start <= start && seg->end > start) {
    2456        3020 :                         size_t lcnt = cnt;
    2457        3020 :                         if (start+lcnt > seg->end)
    2458          54 :                                 lcnt = seg->end-start;
    2459        3020 :                         if (SEG_IS_DELETED(seg, tr)) {
    2460          47 :                                 start += lcnt;
    2461          47 :                                 cnt -= lcnt;
    2462          47 :                                 continue;
    2463        2973 :                         } else if (!SEG_VALID_4_DELETE(seg, tr))
    2464           1 :                                 return LOG_CONFLICT;
    2465        2972 :                         if (deletes_conflict_updates( tr, t, start, lcnt))
    2466             :                                 return LOG_CONFLICT;
    2467        2972 :                         *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
    2468        2972 :                         if (!seg)
    2469             :                                 return LOG_ERR;
    2470        2972 :                         start += lcnt;
    2471        2972 :                         cnt -= lcnt;
    2472             :                 }
    2473        9663 :                 if (start+cnt <= seg->end)
    2474             :                         break;
    2475             :         }
    2476             :         return LOG_OK;
    2477             : }
    2478             : 
    2479             : static int
    2480         604 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
    2481             : {
    2482         604 :         segment *seg = s->segs->h;
    2483         604 :         return seg_delete_range(tr, t, s, &seg, start, cnt);
    2484             : }
    2485             : 
    2486             : static int
    2487         297 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
    2488             : {
    2489         297 :         int in_transaction = segments_in_transaction(tr, t);
    2490         297 :         BAT *oi = i;    /* update ids */
    2491         297 :         int ok = LOG_OK;
    2492             : 
    2493         297 :         if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
    2494             :                 return LOG_ERR;
    2495         297 :         if (BATcount(i)) {
    2496         522 :                 if (BATtdense(i)) {
    2497         225 :                         size_t start = i->tseqbase;
    2498         225 :                         size_t cnt = BATcount(i);
    2499             : 
    2500         225 :                         lock_table(tr->store, t->base.id);
    2501         225 :                         ok = delete_range(tr, t, s, start, cnt);
    2502         225 :                         unlock_table(tr->store, t->base.id);
    2503          72 :                 } else if (complex_cand(i)) {
    2504           0 :                         struct canditer ci;
    2505           0 :                         oid f = 0, l = 0, cur = 0;
    2506             : 
    2507           0 :                         canditer_init(&ci, NULL, i);
    2508           0 :                         cur = f = canditer_next(&ci);
    2509             : 
    2510           0 :                         lock_table(tr->store, t->base.id);
    2511           0 :                         if (!is_oid_nil(f)) {
    2512           0 :                                 segment *seg = s->segs->h;
    2513           0 :                                 for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
    2514           0 :                                         if (cur+1 == l) {
    2515           0 :                                                 cur++;
    2516           0 :                                                 continue;
    2517             :                                         }
    2518           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2519           0 :                                         f = cur = l;
    2520             :                                 }
    2521           0 :                                 if (ok == LOG_OK)
    2522           0 :                                         ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
    2523             :                         }
    2524           0 :                         unlock_table(tr->store, t->base.id);
    2525             :                 } else {
    2526          72 :                         if (!i->tsorted) {
    2527           0 :                                 assert(oi == i);
    2528           0 :                                 BAT *ni = NULL;
    2529           0 :                                 if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
    2530           0 :                                         ok = LOG_ERR;
    2531           0 :                                 if (ni)
    2532           0 :                                         i = ni;
    2533             :                         }
    2534          72 :                         assert(i->tsorted);
    2535          72 :                         BUN icnt = BATcount(i);
    2536          72 :                         BATiter ii = bat_iterator(i);
    2537          72 :                         oid *o = ii.base, n = o[0]+1;
    2538          72 :                         size_t lcnt = 1;
    2539             : 
    2540          72 :                         lock_table(tr->store, t->base.id);
    2541          72 :                         segment *seg = s->segs->h;
    2542       24025 :                         for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
    2543       23953 :                                 if (o[i] == n) {
    2544       23610 :                                         lcnt++;
    2545       23610 :                                         n++;
    2546             :                                 } else {
    2547         343 :                                         ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2548         343 :                                         lcnt = 0;
    2549             :                                 }
    2550       23953 :                                 if (!lcnt) {
    2551         343 :                                         n = o[i]+1;
    2552         343 :                                         lcnt = 1;
    2553             :                                 }
    2554             :                         }
    2555          72 :                         bat_iterator_end(&ii);
    2556          72 :                         if (lcnt && ok == LOG_OK)
    2557          72 :                                 ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
    2558          72 :                         unlock_table(tr->store, t->base.id);
    2559             :                 }
    2560             :         }
    2561         297 :         if (i != oi)
    2562          25 :                 bat_destroy(i);
    2563             :         // assert
    2564         297 :         if (!in_transaction)
    2565         267 :                 trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    2566             :         return ok;
    2567             : }
    2568             : 
    2569             : static void
    2570       50775 : destroy_segments(segments *s)
    2571             : {
    2572       50775 :         if (!s || sql_ref_dec(&s->r) > 0)
    2573           0 :                 return;
    2574       50775 :         segment *seg = s->h;
    2575      109931 :         while(seg) {
    2576       59156 :                 segment *n = ATOMIC_PTR_GET(&seg->next);
    2577       59156 :                 ATOMIC_PTR_DESTROY(&seg->next);
    2578       59156 :                 _DELETE(seg);
    2579       59156 :                 seg = n;
    2580             :         }
    2581       50775 :         _DELETE(s);
    2582             : }
    2583             : 
    2584             : static void
    2585       52115 : destroy_storage(storage *bat)
    2586             : {
    2587       52115 :         if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
    2588             :                 return;
    2589       50635 :         if (bat->next)
    2590        6061 :                 destroy_storage(bat->next);
    2591       50635 :         destroy_segments(bat->segs);
    2592       50635 :         if (bat->cs.uibid)
    2593       30809 :                 temp_destroy(bat->cs.uibid);
    2594       50635 :         if (bat->cs.uvbid)
    2595       30809 :                 temp_destroy(bat->cs.uvbid);
    2596       50635 :         if (bat->cs.bid)
    2597       50635 :                 temp_destroy(bat->cs.bid);
    2598       50635 :         bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
    2599       50635 :         _DELETE(bat);
    2600             : }
    2601             : 
    2602             : static int
    2603      158913 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
    2604             : {
    2605      158913 :         if (uncommitted) {
    2606      426114 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2607      281747 :                         if (!VALID_4_READ(s->ts,tr))
    2608             :                                 return 1;
    2609             :         } else {
    2610      119803 :                 for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
    2611      106224 :                         if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
    2612             :                                 return 1;
    2613             :         }
    2614             : 
    2615             :         return 0;
    2616             : }
    2617             : 
    2618             : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
    2619             : 
    2620             : storage *
    2621     2133016 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
    2622             : {
    2623     2133016 :         storage *obat;
    2624             : 
    2625     2133016 :         obat = ATOMIC_PTR_GET(&t->data);
    2626             : 
    2627     2133016 :         if (obat->cs.ts != tr->tid)
    2628     1472121 :                 if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
    2629     1472066 :                         if (obat->cs.ts >= TRANSACTION_ID_BASE) {
    2630             :                                 /* abort */
    2631       15168 :                                 if (clear)
    2632       15168 :                                         *clear = true;
    2633       15168 :                                 return NULL;
    2634             :                         }
    2635             : 
    2636     2117848 :         if (!clear)
    2637             :                 return obat;
    2638             : 
    2639             :         /* remainder is only to handle clear */
    2640       26598 :         if (segments_conflict(tr, obat->segs, 1)) {
    2641         577 :                 *clear = true;
    2642         577 :                 return NULL;
    2643             :         }
    2644       26023 :         if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
    2645             :                 return NULL;
    2646       26025 :         storage *bat = ZNEW(storage);
    2647       26025 :         if (!bat)
    2648             :                 return NULL;
    2649       26025 :         ATOMIC_INIT(&bat->cs.refcnt, 1);
    2650       26025 :         if (dup_storage(tr, obat, bat) != LOG_OK) {
    2651           0 :                 destroy_storage(bat);
    2652           0 :                 return NULL;
    2653             :         }
    2654       26026 :         bat->cs.cleared = true;
    2655       26026 :         bat->cs.ts = tr->tid;
    2656             :         /* only one writer else abort */
    2657       26026 :         bat->next = obat;
    2658       26026 :         if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
    2659          17 :                 bat->next = NULL;
    2660          17 :                 destroy_storage(bat);
    2661          17 :                 if (clear)
    2662          17 :                         *clear = true;
    2663          17 :                 return NULL;
    2664             :         }
    2665             :         return bat;
    2666             : }
    2667             : 
    2668             : static int
    2669       70796 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
    2670             : {
    2671       70796 :         int ok = LOG_OK;
    2672       70796 :         BAT *b = ib;
    2673       70796 :         storage *bat;
    2674             : 
    2675       70796 :         if (isbat && !BATcount(b))
    2676             :                 return ok;
    2677             : 
    2678       70757 :         if (t == NULL)
    2679             :                 return LOG_ERR;
    2680             : 
    2681       70757 :         if ((bat = bind_del_data(tr, t, NULL)) == NULL)
    2682             :                 return LOG_ERR;
    2683             : 
    2684       70757 :         if (isbat)
    2685         297 :                 ok = storage_delete_bat(tr, t, bat, ib);
    2686             :         else
    2687       70460 :                 ok = storage_delete_val(tr, t, bat, *(oid*)ib);
    2688             :         return ok;
    2689             : }
    2690             : 
    2691             : static size_t
    2692           0 : dcount_col(sql_trans *tr, sql_column *c)
    2693             : {
    2694           0 :         sql_delta *b;
    2695             : 
    2696           0 :         if (!isTable(c->t))
    2697             :                 return 0;
    2698           0 :         b = col_timestamp_delta(tr, c);
    2699           0 :         if (!b)
    2700             :                 return 1;
    2701             : 
    2702           0 :         storage *s = ATOMIC_PTR_GET(&c->t->data);
    2703           0 :         if (!s || !s->segs->t)
    2704             :                 return 1;
    2705           0 :         size_t cnt = s->segs->t->end;
    2706           0 :         if (cnt) {
    2707           0 :                 BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
    2708           0 :                 size_t dcnt = 0;
    2709             : 
    2710           0 :                 if (v)
    2711           0 :                         dcnt = BATguess_uniques(v, NULL);
    2712           0 :                 return dcnt;
    2713             :         }
    2714             :         return cnt;
    2715             : }
    2716             : 
    2717             : static BAT *
    2718     3674216 : bind_no_view(BAT *b, bool quick)
    2719             : {
    2720     3674216 :         if (isVIEW(b)) { /* If it is a view get the parent BAT */
    2721     3672287 :                 BAT *nb = BBP_desc(VIEWtparent(b));
    2722     3672287 :                 bat_destroy(b);
    2723     3672916 :                 if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
    2724             :                         return NULL;
    2725             :         }
    2726             :         return b;
    2727             : }
    2728             : 
    2729             : static int
    2730           0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
    2731             : {
    2732           0 :         int ok = 0;
    2733           0 :         assert(tr->active);
    2734           0 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2735           0 :                 return 0;
    2736           0 :         lock_column(tr->store, c->base.id);
    2737           0 :         if (unique_est) {
    2738           0 :                 sql_delta *d;
    2739           0 :                 if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
    2740           0 :                         BAT *b;
    2741           0 :                         if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
    2742           0 :                                 MT_lock_set(&b->theaplock);
    2743           0 :                                 b->tunique_est = *unique_est;
    2744           0 :                                 MT_lock_unset(&b->theaplock);
    2745           0 :                                 bat_destroy(b);
    2746             :                         }
    2747             :                 }
    2748             :         }
    2749           0 :         if (min) {
    2750           0 :                 _DELETE(c->min);
    2751           0 :                 size_t minlen = ATOMlen(c->type.type->localtype, min);
    2752           0 :                 if ((c->min = GDKmalloc(minlen)) != NULL) {
    2753           0 :                         memcpy(c->min, min, minlen);
    2754           0 :                         ok = 1;
    2755             :                 }
    2756             :         }
    2757           0 :         if (max) {
    2758           0 :                 _DELETE(c->max);
    2759           0 :                 size_t maxlen = ATOMlen(c->type.type->localtype, max);
    2760           0 :                 if ((c->max = GDKmalloc(maxlen)) != NULL) {
    2761           0 :                         memcpy(c->max, max, maxlen);
    2762           0 :                         ok = 1;
    2763             :                 }
    2764             :         }
    2765           0 :         unlock_column(tr->store, c->base.id);
    2766           0 :         return ok;
    2767             : }
    2768             : 
    2769             : static int
    2770           9 : min_max_col(sql_trans *tr, sql_column *c)
    2771             : {
    2772           9 :         int ok = 0;
    2773           9 :         BAT *b = NULL;
    2774           9 :         sql_delta *d = NULL;
    2775             : 
    2776           9 :         assert(tr->active);
    2777           9 :         if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
    2778           0 :                 return 0;
    2779           9 :         if (c->min && c->max)
    2780             :                 return 1;
    2781           9 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2782           9 :                 if (d->cs.st == ST_FOR)
    2783             :                         return 0;
    2784           9 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2785           9 :                 lock_column(tr->store, c->base.id);
    2786           9 :                 if (c->min && c->max) {
    2787           0 :                         unlock_column(tr->store, c->base.id);
    2788           0 :                         return 1;
    2789             :                 }
    2790           9 :                 _DELETE(c->min);
    2791           9 :                 _DELETE(c->max);
    2792           9 :                 if ((b = bind_col(tr, c, access))) {
    2793           9 :                         if (!(b = bind_no_view(b, false))) {
    2794           0 :                                 unlock_column(tr->store, c->base.id);
    2795           0 :                                 return 0;
    2796             :                         }
    2797           9 :                         BATiter bi = bat_iterator(b);
    2798           9 :                         if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
    2799           9 :                                 const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
    2800           9 :                                 size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
    2801             : 
    2802           9 :                                 if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
    2803           0 :                                         _DELETE(c->min);
    2804           0 :                                         _DELETE(c->max);
    2805             :                                 } else {
    2806           9 :                                         memcpy(c->min, nmin, minlen);
    2807           9 :                                         memcpy(c->max, nmax, maxlen);
    2808           9 :                                         ok = 1;
    2809             :                                 }
    2810             :                         }
    2811           9 :                         bat_iterator_end(&bi);
    2812           9 :                         bat_destroy(b);
    2813             :                 }
    2814           9 :                 unlock_column(tr->store, c->base.id);
    2815             :         }
    2816             :         return ok;
    2817             : }
    2818             : 
    2819             : static size_t
    2820          17 : count_segs(segment *s)
    2821             : {
    2822          17 :         size_t nr = 0;
    2823             : 
    2824          72 :         for( ; s; s = ATOMIC_PTR_GET(&s->next))
    2825          55 :                 nr++;
    2826          17 :         return nr;
    2827             : }
    2828             : 
    2829             : static size_t
    2830          34 : count_del(sql_trans *tr, sql_table *t, int access)
    2831             : {
    2832          34 :         storage *d;
    2833             : 
    2834          34 :         if (!isTable(t))
    2835             :                 return 0;
    2836          34 :         d = tab_timestamp_storage(tr, t);
    2837          34 :         if (!d)
    2838             :                 return 0;
    2839          34 :         if (access == 2)
    2840           0 :                 return d->cs.ucnt;
    2841          34 :         if (access == 1)
    2842           0 :                 return count_inserts(d->segs->h, tr);
    2843          34 :         if (access == 10) /* special case for counting the number of segments */
    2844          17 :                 return count_segs(d->segs->h);
    2845          17 :         return count_deletes(d->segs->h, tr);
    2846             : }
    2847             : 
    2848             : static int
    2849       26283 : sorted_col(sql_trans *tr, sql_column *col)
    2850             : {
    2851       26283 :         int sorted = 0;
    2852             : 
    2853       26283 :         assert(tr->active);
    2854       26283 :         if (!isTable(col->t) || !col->t->s)
    2855             :                 return 0;
    2856             : 
    2857       26283 :         if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
    2858       26264 :                 BAT *b = bind_col(tr, col, QUICK);
    2859             : 
    2860       26264 :                 if (b)
    2861       26264 :                         sorted = b->tsorted || b->trevsorted;
    2862             :         }
    2863             :         return sorted;
    2864             : }
    2865             : 
    2866             : static int
    2867        7230 : unique_col(sql_trans *tr, sql_column *col)
    2868             : {
    2869        7230 :         int distinct = 0;
    2870             : 
    2871        7230 :         assert(tr->active);
    2872        7230 :         if (!isTable(col->t) || !col->t->s)
    2873             :                 return 0;
    2874             : 
    2875        7230 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2876        7230 :                 BAT *b = bind_col(tr, col, QUICK);
    2877             : 
    2878        7230 :                 if (b)
    2879        7230 :                         distinct = b->tkey;
    2880             :         }
    2881             :         return distinct;
    2882             : }
    2883             : 
    2884             : static int
    2885        1716 : double_elim_col(sql_trans *tr, sql_column *col)
    2886             : {
    2887        1716 :         int de = 0;
    2888        1716 :         sql_delta *d;
    2889             : 
    2890        1716 :         assert(tr->active);
    2891        1716 :         if (!isTable(col->t) || !col->t->s)
    2892             :                 return 0;
    2893             : 
    2894        1716 :         if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
    2895           6 :                 if (d->cs.st == ST_DICT) {
    2896           6 :                         BAT *b = bind_col(tr, col, QUICK);
    2897           6 :                         if (b && b->ttype == TYPE_bte)
    2898             :                                 de = 1;
    2899           0 :                         else if (b && b->ttype == TYPE_sht)
    2900        1716 :                                 de = 2;
    2901             :                 }
    2902        1710 :         } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
    2903        1710 :                 BAT *b = bind_col(tr, col, QUICK);
    2904             : 
    2905        1710 :                 if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
    2906        1710 :                         de = GDK_ELIMDOUBLES(b->tvheap);
    2907        1710 :                         if (de)
    2908        1484 :                                 de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
    2909             :                 }
    2910        1484 :                 assert(de >= 0 && de <= 16);
    2911             :         }
    2912             :         return de;
    2913             : }
    2914             : 
    2915             : static int
    2916     3710629 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
    2917             : {
    2918     3710629 :         int ok = 0;
    2919     3710629 :         BAT *b = NULL, *off = NULL, *upv = NULL;
    2920     3710629 :         sql_delta *d = NULL;
    2921             : 
    2922     3710629 :         (void) tr;
    2923     3710629 :         assert(tr->active);
    2924     3710629 :         *nonil = false;
    2925     3710629 :         *unique = false;
    2926     3710629 :         *unique_est = 0.0;
    2927     3710629 :         if (!c || !isTable(c->t) || !c->t->s)
    2928             :                 return ok;
    2929             : 
    2930     3710422 :         if ((d = ATOMIC_PTR_GET(&c->data))) {
    2931     3673554 :                 if (d->cs.st == ST_FOR) {
    2932          30 :                         *nonil = true; /* TODO for min/max. I will do it later */
    2933          30 :                         return ok;
    2934             :                 }
    2935     3673524 :                 int eclass = c->type.type->eclass;
    2936     3673524 :                 int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
    2937     3673524 :                 if ((b = bind_col(tr, c, access))) {
    2938     3673313 :                         if (!(b = bind_no_view(b, false)))
    2939           0 :                                 return ok;
    2940     3674177 :                         BATiter bi = bat_iterator(b);
    2941     3673368 :                         *nonil = bi.nonil && !bi.nil;
    2942             : 
    2943     3673368 :                         if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
    2944     3382204 :                                 d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
    2945     2226512 :                                 if (c->min && VALinit(min, bi.type, c->min))
    2946             :                                         ok |= 1;
    2947     2226496 :                                 else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
    2948     2218033 :                                         ok |= 1;
    2949     2226402 :                                 if (c->max && VALinit(max, bi.type, c->max))
    2950          16 :                                         ok |= 2;
    2951     2226388 :                                 else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
    2952     2212532 :                                         ok |= 2;
    2953             :                         }
    2954     3673227 :                         if (d->cs.ucnt == 0) {
    2955     3670620 :                                 if (d->cs.st == ST_DEFAULT) {
    2956     3669915 :                                         *unique = bi.key;
    2957     3669915 :                                         *unique_est = bi.unique_est;
    2958     3669915 :                                         if (*unique_est == 0)
    2959     1167295 :                                                 *unique_est = (double)BATguess_uniques(b,NULL);
    2960         705 :                                 } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
    2961             :                                         /* for dict, check the offsets bat for uniqueness */
    2962         705 :                                         MT_lock_set(&off->theaplock);
    2963         705 :                                         *unique = off->tkey;
    2964         705 :                                         *unique_est = off->tunique_est;
    2965         705 :                                         MT_lock_unset(&off->theaplock);
    2966             :                                 }
    2967             :                         }
    2968     3673860 :                         bat_iterator_end(&bi);
    2969     3673483 :                         bat_destroy(b);
    2970     3673230 :                         if (*nonil && d->cs.ucnt > 0) {
    2971             :                                 /* This could use a quick descriptor */
    2972         519 :                                 if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
    2973           0 :                                         *nonil = false;
    2974             :                                 } else {
    2975         519 :                                         MT_lock_set(&upv->theaplock);
    2976         519 :                                         *nonil &= upv->tnonil && !upv->tnil;
    2977         519 :                                         MT_lock_unset(&upv->theaplock);
    2978         519 :                                         bat_destroy(upv);
    2979             :                                 }
    2980             :                         }
    2981             :                 }
    2982             :         }
    2983             :         return ok;
    2984             : }
    2985             : 
    2986             : static int
    2987         257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
    2988             : {
    2989         257 :         assert(tr->active);
    2990         257 :         if (!isTable(col->t) || !col->t->s)
    2991             :                 return LOG_OK;
    2992             : 
    2993         252 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    2994         252 :                 BAT *b = bind_col(tr, col, QUICK);
    2995             : 
    2996         252 :                 if (b) { /* add props for ranges [min, max> */
    2997         252 :                         MT_lock_set(&b->theaplock);
    2998         252 :                         if (add_range) {
    2999         179 :                                 BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
    3000         179 :                                 if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
    3001         103 :                                         BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
    3002             :                                 else
    3003          76 :                                         BATrmprop_nolock(b, GDK_MAX_BOUND);
    3004         179 :                                 if (!pt->with_nills || !col->null)
    3005         117 :                                         BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3006             :                         } else {
    3007          73 :                                 BATrmprop_nolock(b, GDK_MIN_BOUND);
    3008          73 :                                 BATrmprop_nolock(b, GDK_MAX_BOUND);
    3009          73 :                                 BATrmprop_nolock(b, GDK_NOT_NULL);
    3010             :                         }
    3011         252 :                         MT_lock_unset(&b->theaplock);
    3012             :                 }
    3013             :         }
    3014             :         return LOG_OK;
    3015             : }
    3016             : 
    3017             : static int
    3018        4313 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
    3019             : {
    3020        4313 :         assert(tr->active);
    3021        4313 :         if (!isTable(col->t) || !col->t->s)
    3022             :                 return LOG_OK;
    3023             : 
    3024        4283 :         if (col && ATOMIC_PTR_GET(&col->data)) {
    3025        4283 :                 BAT *b = bind_col(tr, col, QUICK);
    3026             : 
    3027        4283 :                 if (b) { /* add props for ranges [min, max> */
    3028        4283 :                         if (not_null) {
    3029        4281 :                                 BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
    3030             :                         } else {
    3031           2 :                                 BATrmprop(b, GDK_NOT_NULL);
    3032             :                         }
    3033             :                 }
    3034             :         }
    3035             :         return LOG_OK;
    3036             : }
    3037             : 
    3038             : static int
    3039       30322 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
    3040             : {
    3041       30322 :         sqlstore *store = tr->store;
    3042       30322 :         int bid = log_find_bat(store->logger, id);
    3043       30322 :         if (bid <= 0)
    3044             :                 return LOG_ERR;
    3045       30322 :         cs->bid = temp_dup(bid);
    3046       30322 :         cs->ucnt = 0;
    3047       30322 :         cs->uibid = e_bat(TYPE_oid);
    3048       30322 :         cs->uvbid = e_bat(type);
    3049       30322 :         if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
    3050             :                 return LOG_ERR;
    3051             :         return LOG_OK;
    3052             : }
    3053             : 
    3054             : static int
    3055       65930 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
    3056             : {
    3057       65930 :         int res = LOG_OK;
    3058       65930 :         gdk_return ok;
    3059       65930 :         BAT *b = temp_descriptor(bat->cs.bid);
    3060             : 
    3061       65930 :         if (b == NULL)
    3062             :                 return LOG_ERR;
    3063             : 
    3064       65930 :         if (!bat->cs.uibid)
    3065       65924 :                 bat->cs.uibid = e_bat(TYPE_oid);
    3066       65930 :         if (!bat->cs.uvbid)
    3067       65924 :                 bat->cs.uvbid = e_bat(b->ttype);
    3068       65930 :         if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
    3069           0 :                 res = LOG_ERR;
    3070       65930 :         if (GDKinmemory(0)) {
    3071         174 :                 bat_destroy(b);
    3072         174 :                 return res;
    3073             :         }
    3074             : 
    3075       65756 :         bat_set_access(b, BAT_READ);
    3076       65756 :         sqlstore *store = tr->store;
    3077       65756 :         ok = log_bat_persists(store->logger, b, id);
    3078       65756 :         bat_destroy(b);
    3079       65756 :         if(res != LOG_OK)
    3080             :                 return res;
    3081       65756 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3082             : }
    3083             : 
    3084             : static int
    3085           0 : new_persistent_delta( sql_delta *bat)
    3086             : {
    3087           0 :         bat->cs.ucnt = 0;
    3088           0 :         return LOG_OK;
    3089             : }
    3090             : 
    3091             : static void
    3092      124665 : create_delta( sql_delta *d, BAT *b)
    3093             : {
    3094      124665 :         bat_set_access(b, BAT_READ);
    3095      124665 :         d->cs.bid = temp_create(b);
    3096      124665 :         d->cs.uibid = d->cs.uvbid = 0;
    3097      124665 :         d->cs.ucnt = 0;
    3098      124665 : }
    3099             : 
    3100             : static bat
    3101        7020 : copyBat (bat i, int type, oid seq)
    3102             : {
    3103        7020 :         BAT *b, *tb;
    3104        7020 :         bat res;
    3105             : 
    3106        7020 :         if (!i)
    3107             :                 return i;
    3108        7020 :         tb = quick_descriptor(i);
    3109        7020 :         if (tb == NULL)
    3110             :                 return 0;
    3111        7020 :         b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
    3112        7020 :         if (b == NULL)
    3113             :                 return 0;
    3114             : 
    3115        7020 :         bat_set_access(b, BAT_READ);
    3116             : 
    3117        7020 :         res = temp_create(b);
    3118        7020 :         bat_destroy(b);
    3119        7020 :         return res;
    3120             : }
    3121             : 
    3122             : static int
    3123      147911 : create_col(sql_trans *tr, sql_column *c)
    3124             : {
    3125      147911 :         int ok = LOG_OK, new = 0;
    3126      147911 :         int type = c->type.type->localtype;
    3127      147911 :         sql_delta *bat = ATOMIC_PTR_GET(&c->data);
    3128             : 
    3129      147911 :         if (!bat) {
    3130      147911 :                 new = 1;
    3131      147911 :                 bat = ZNEW(sql_delta);
    3132      147911 :                 if (!bat)
    3133             :                         return LOG_ERR;
    3134      147911 :                 ATOMIC_PTR_SET(&c->data, bat);
    3135      147911 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3136             :         }
    3137             : 
    3138      147911 :         if (new)
    3139      147911 :                 bat->cs.ts = tr->tid;
    3140             : 
    3141      147911 :         if (!isNew(c)&& !isTempTable(c->t)){
    3142       23226 :                 bat->cs.ts = tr->ts;
    3143       23226 :                 ok = load_cs(tr, &bat->cs, type, c->base.id);
    3144       23226 :                 if (ok == LOG_OK && c->storage_type) {
    3145           4 :                         if (strcmp(c->storage_type, "DICT") == 0) {
    3146           2 :                                 sqlstore *store = tr->store;
    3147           2 :                                 int bid = log_find_bat(store->logger, -c->base.id);
    3148           2 :                                 if (bid <= 0)
    3149             :                                         return LOG_ERR;
    3150           2 :                                 bat->cs.ebid = temp_dup(bid);
    3151           2 :                                 bat->cs.st = ST_DICT;
    3152           2 :                         } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
    3153           2 :                                 bat->cs.st = ST_FOR;
    3154             :                         }
    3155             :                 }
    3156       23226 :                 return ok;
    3157      124685 :         } else if (bat && bat->cs.bid) {
    3158           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
    3159             :         } else {
    3160      124685 :                 sql_column *fc = NULL;
    3161      124685 :                 size_t cnt = 0;
    3162             : 
    3163             :                 /* alter ? */
    3164      124685 :                 if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
    3165       77066 :                         storage *s = tab_timestamp_storage(tr, fc->t);
    3166       77066 :                         if (s == NULL)
    3167             :                                 return LOG_ERR;
    3168       77066 :                         cnt = segs_end(s->segs, tr, c->t);
    3169             :                 }
    3170      124685 :                 if (cnt && fc != c) {
    3171          20 :                         sql_delta *d = ATOMIC_PTR_GET(&fc->data);
    3172             : 
    3173          20 :                         if (d->cs.bid) {
    3174          20 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3175          20 :                                 if(bat->cs.bid == BID_NIL)
    3176          20 :                                         ok = LOG_ERR;
    3177             :                         }
    3178          20 :                         if (d->cs.uibid) {
    3179           8 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3180           8 :                                 if (bat->cs.uibid == BID_NIL)
    3181          20 :                                         ok = LOG_ERR;
    3182             :                         }
    3183          20 :                         if (d->cs.uvbid) {
    3184           8 :                                 bat->cs.uvbid = e_bat(type);
    3185           8 :                                 if(bat->cs.uvbid == BID_NIL)
    3186           0 :                                         ok = LOG_ERR;
    3187             :                         }
    3188             :                 } else {
    3189      124665 :                         BAT *b = bat_new(type, c->t->sz, PERSISTENT);
    3190      124665 :                         if (!b) {
    3191             :                                 ok = LOG_ERR;
    3192             :                         } else {
    3193      124665 :                                 create_delta(ATOMIC_PTR_GET(&c->data), b);
    3194      124665 :                                 bat_destroy(b);
    3195             :                         }
    3196             : 
    3197      124664 :                         if (!new) {
    3198           0 :                                 bat->cs.uibid = e_bat(TYPE_oid);
    3199           0 :                                 if (bat->cs.uibid == BID_NIL)
    3200           0 :                                         ok = LOG_ERR;
    3201           0 :                                 bat->cs.uvbid = e_bat(type);
    3202           0 :                                 if(bat->cs.uvbid == BID_NIL)
    3203           0 :                                         ok = LOG_ERR;
    3204             :                         }
    3205             :                 }
    3206      124684 :                 bat->cs.ucnt = 0;
    3207             : 
    3208      124684 :                 if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
    3209          76 :                         trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
    3210             :         }
    3211             :         return ok;
    3212             : }
    3213             : 
    3214             : static int
    3215       59690 : log_create_col_(sql_trans *tr, sql_column *c)
    3216             : {
    3217       59690 :         assert(!isTempTable(c->t));
    3218       59690 :         return log_create_delta(tr,  ATOMIC_PTR_GET(&c->data), c->base.id);
    3219             : }
    3220             : 
    3221             : static int
    3222          72 : log_create_col(sql_trans *tr, sql_change *change)
    3223             : {
    3224          72 :         return log_create_col_(tr, (sql_column*)change->obj);
    3225             : }
    3226             : 
    3227             : static int
    3228      112490 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
    3229             : {
    3230      112490 :         (void) t; // TODO transaction_layer_revamp: remove if unnecessary
    3231      112490 :         (void)oldest;
    3232      112490 :         assert(delta->cs.ts == tr->tid);
    3233      112490 :         delta->cs.ts = commit_ts;
    3234             : 
    3235      112490 :         assert(delta->next == NULL);
    3236      112490 :         if (!delta->cs.merged)
    3237      112489 :                 merge_delta(delta);
    3238      112490 :         if (!tr->parent)
    3239      112486 :                 base->new = 0;
    3240      112490 :         return LOG_OK;
    3241             : }
    3242             : 
    3243             : static int
    3244          76 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3245             : {
    3246          76 :         sql_column *c = (sql_column*)change->obj;
    3247          76 :         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3248          76 :         if (!tr->parent)
    3249          75 :                 c->base.new = 0;
    3250          76 :         return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
    3251             : }
    3252             : 
    3253             : /* will be called for new idx's and when new index columns are created */
    3254             : static int
    3255        9280 : create_idx(sql_trans *tr, sql_idx *ni)
    3256             : {
    3257        9280 :         int ok = LOG_OK, new = 0;
    3258        9280 :         sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
    3259        9280 :         int type = TYPE_lng;
    3260             : 
    3261        9280 :         if (oid_index(ni->type))
    3262         935 :                 type = TYPE_oid;
    3263             : 
    3264        9280 :         if (!bat) {
    3265        9280 :                 new = 1;
    3266        9280 :                 bat = ZNEW(sql_delta);
    3267        9280 :                 if (!bat)
    3268             :                         return LOG_ERR;
    3269        9280 :                 ATOMIC_PTR_INIT(&ni->data, bat);
    3270        9280 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3271             :         }
    3272             : 
    3273        9280 :         if (new)
    3274        9280 :                 bat->cs.ts = tr->tid;
    3275             : 
    3276        9280 :         if (!isNew(ni) && !isTempTable(ni->t)){
    3277        2280 :                 bat->cs.ts = 1;
    3278        2280 :                 return load_cs(tr, &bat->cs, type, ni->base.id);
    3279        7000 :         } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
    3280           0 :                 return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
    3281             :         } else {
    3282        7000 :                 sql_column *c = ol_first_node(ni->t->columns)->data;
    3283        7000 :                 sql_delta *d = col_timestamp_delta(tr, c);
    3284             : 
    3285        7000 :                 if (d) {
    3286             :                         /* Here we also handle indices created through alter stmts */
    3287             :                         /* These need to be created aligned to the existing data */
    3288        7000 :                         if (d->cs.bid) {
    3289        7000 :                                 bat->cs.bid = copyBat(d->cs.bid, type, 0);
    3290        7000 :                                 if(bat->cs.bid == BID_NIL)
    3291        7000 :                                         ok = LOG_ERR;
    3292             :                         }
    3293             :                 } else {
    3294             :                         return LOG_ERR;
    3295             :                 }
    3296             : 
    3297        7000 :                 bat->cs.ucnt = 0;
    3298             : 
    3299        7000 :                 if (!new) {
    3300           0 :                         bat->cs.uibid = e_bat(TYPE_oid);
    3301           0 :                         if (bat->cs.uibid == BID_NIL)
    3302           0 :                                 ok = LOG_ERR;
    3303           0 :                         bat->cs.uvbid = e_bat(type);
    3304           0 :                         if(bat->cs.uvbid == BID_NIL)
    3305           0 :                                 ok = LOG_ERR;
    3306             :                 }
    3307        7000 :                 bat->cs.ucnt = 0;
    3308        7000 :                 if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
    3309         619 :                         trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
    3310             :         }
    3311             :         return ok;
    3312             : }
    3313             : 
    3314             : static int
    3315        6240 : log_create_idx_(sql_trans *tr, sql_idx *i)
    3316             : {
    3317        6240 :         assert(!isTempTable(i->t));
    3318        6240 :         return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3319             : }
    3320             : 
    3321             : static int
    3322         606 : log_create_idx(sql_trans *tr, sql_change *change)
    3323             : {
    3324         606 :         return log_create_idx_(tr, (sql_idx*)change->obj);
    3325             : }
    3326             : 
    3327             : static int
    3328         619 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3329             : {
    3330         619 :         sql_idx *i = (sql_idx*)change->obj;
    3331         619 :         sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3332         619 :         if (!tr->parent)
    3333         619 :                 i->base.new = 0;
    3334         619 :         return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
    3335             :         return LOG_OK;
    3336             : }
    3337             : 
    3338             : static int
    3339        4816 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
    3340             : {
    3341        4816 :         int ok = load_cs(tr, &s->cs, TYPE_msk, id);
    3342        4816 :         BAT *b = NULL, *ib = NULL;
    3343             : 
    3344        4816 :         if (ok != LOG_OK)
    3345             :                 return ok;
    3346        4816 :         if (!(b = temp_descriptor(s->cs.bid)))
    3347             :                 return LOG_ERR;
    3348        4816 :         ib = b;
    3349             : 
    3350        4816 :         if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
    3351           0 :                 bat_destroy(ib);
    3352           0 :                 return LOG_ERR;
    3353             :         }
    3354             : 
    3355        4816 :         if (BATcount(b)) {
    3356         328 :                 if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
    3357           0 :                         bat_destroy(ib);
    3358           0 :                         return LOG_ERR;
    3359             :                 }
    3360         505 :                 if (BATtdense(b)) {
    3361         177 :                         size_t start = b->tseqbase;
    3362         177 :                         size_t cnt = BATcount(b);
    3363         177 :                         ok = delete_range(tr, t, s, start, cnt);
    3364             :                 } else {
    3365         151 :                         assert(b->tsorted);
    3366         151 :                         BUN icnt = BATcount(b);
    3367         151 :                         BATiter bi = bat_iterator(b);
    3368         151 :                         size_t lcnt = 1;
    3369         151 :                         oid n;
    3370         151 :                         segment *seg = s->segs->h;
    3371         151 :                         if (complex_cand(b)) {
    3372           0 :                                 oid o = * (oid *) Tpos(&bi, 0);
    3373           0 :                                 n = o + 1;
    3374           0 :                                 for (BUN i = 1; i < icnt; i++) {
    3375           0 :                                         o = * (oid *) Tpos(&bi, i);
    3376           0 :                                         if (o == n) {
    3377           0 :                                                 lcnt++;
    3378           0 :                                                 n++;
    3379             :                                         } else {
    3380           0 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3381             :                                                         break;
    3382             :                                                 lcnt = 0;
    3383             :                                         }
    3384           0 :                                         if (!lcnt) {
    3385           0 :                                                 n = o + 1;
    3386           0 :                                                 lcnt = 1;
    3387             :                                         }
    3388             :                                 }
    3389             :                         } else {
    3390         151 :                                 oid *o = bi.base;
    3391         151 :                                 n = o[0]+1;
    3392      207956 :                                 for (size_t i=1; i<icnt; i++) {
    3393      207805 :                                         if (o[i] == n) {
    3394      205854 :                                                 lcnt++;
    3395      205854 :                                                 n++;
    3396             :                                         } else {
    3397        1951 :                                                 if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
    3398             :                                                         break;
    3399             :                                                 lcnt = 0;
    3400             :                                         }
    3401      207805 :                                         if (!lcnt) {
    3402        1951 :                                                 n = o[i]+1;
    3403        1951 :                                                 lcnt = 1;
    3404             :                                         }
    3405             :                                 }
    3406             :                         }
    3407         151 :                         if (lcnt && ok == LOG_OK)
    3408         151 :                                 ok = delete_range(tr, t, s, n-lcnt, lcnt);
    3409         151 :                         bat_iterator_end(&bi);
    3410             :                 }
    3411         328 :                 if (ok == LOG_OK)
    3412        5047 :                         for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
    3413        4719 :                                 if (seg->ts == tr->tid)
    3414        2450 :                                         seg->ts = 1;
    3415             :         } else {
    3416        4488 :                 if (ok == LOG_OK) {
    3417        4488 :                         BAT *bb = quick_descriptor(s->cs.bid);
    3418             : 
    3419        4488 :                         if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
    3420             :                                 ok = LOG_ERR;
    3421             :                         } else {
    3422        4488 :                                 segment *seg = s->segs->h;
    3423        4488 :                                 if (seg->ts == tr->tid)
    3424        4488 :                                         seg->ts = 1;
    3425             :                         }
    3426             :                 }
    3427             :         }
    3428        4816 :         if (b != ib)
    3429        4816 :                 bat_destroy(b);
    3430        4816 :         bat_destroy(ib);
    3431             : 
    3432        4816 :         return ok;
    3433             : }
    3434             : 
    3435             : static int
    3436       24648 : create_del(sql_trans *tr, sql_table *t)
    3437             : {
    3438       24648 :         int ok = LOG_OK, new = 0;
    3439       24648 :         BAT *b;
    3440       24648 :         storage *bat = ATOMIC_PTR_GET(&t->data);
    3441             : 
    3442       24648 :         if (!bat) {
    3443       24648 :                 new = 1;
    3444       24648 :                 bat = ZNEW(storage);
    3445       24648 :                 if(!bat)
    3446             :                         return LOG_ERR;
    3447       24648 :                 ATOMIC_PTR_INIT(&t->data, bat);
    3448       24648 :                 ATOMIC_INIT(&bat->cs.refcnt, 1);
    3449       24648 :                 bat->cs.ts = tr->tid;
    3450             :         }
    3451             : 
    3452       24648 :         if (!isNew(t) && !isTempTable(t)) {
    3453        4816 :                 bat->cs.ts = tr->ts;
    3454        4816 :                 return load_storage(tr, t, bat, t->base.id);
    3455       19832 :         } else if (bat->cs.bid) {
    3456             :                 return ok;
    3457             :         } else {
    3458       19832 :                 assert(!bat->segs);
    3459       19832 :                 if (!(bat->segs = new_segments(tr, 0)))
    3460             :                         return LOG_ERR;
    3461             : 
    3462       19832 :                 b = bat_new(TYPE_msk, t->sz, PERSISTENT);
    3463       19832 :                 if(b != NULL) {
    3464       19832 :                         bat_set_access(b, BAT_READ);
    3465       19832 :                         bat->cs.bid = temp_create(b);
    3466       19832 :                         bat_destroy(b);
    3467             :                 } else {
    3468             :                         return LOG_ERR;
    3469             :                 }
    3470       19832 :                 if (new)
    3471       26146 :                         trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
    3472             :         }
    3473             :         return ok;
    3474             : }
    3475             : 
    3476             : static int
    3477      190295 : log_segment(sql_trans *tr, segment *s, sqlid id)
    3478             : {
    3479      190295 :         sqlstore *store = tr->store;
    3480      190295 :         msk m = s->deleted;
    3481      190295 :         return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
    3482             : }
    3483             : 
    3484             : static int
    3485       94100 : log_segments(sql_trans *tr, segments *segs, sqlid id)
    3486             : {
    3487             :         /* log segments */
    3488       94100 :         lock_table(tr->store, id);
    3489      449747 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    3490      355647 :                 unlock_table(tr->store, id);
    3491      355647 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    3492      144552 :                         if (log_segment(tr, seg, id) != LOG_OK) {
    3493           0 :                                 unlock_table(tr->store, id);
    3494           0 :                                 return LOG_ERR;
    3495             :                         }
    3496             :                 }
    3497      355647 :                 lock_table(tr->store, id);
    3498             :         }
    3499       94100 :         unlock_table(tr->store, id);
    3500       94100 :         return LOG_OK;
    3501             : }
    3502             : 
    3503             : static int
    3504       11914 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
    3505             : {
    3506       11914 :         BAT *b;
    3507       11914 :         int ok = LOG_OK;
    3508             : 
    3509       11914 :         if (GDKinmemory(0))
    3510             :                 return LOG_OK;
    3511             : 
    3512       11882 :         b = temp_descriptor(bat->cs.bid);
    3513       11882 :         if (b == NULL)
    3514             :                 return LOG_ERR;
    3515             : 
    3516       11882 :         sqlstore *store = tr->store;
    3517       11882 :         bat_set_access(b, BAT_READ);
    3518       11882 :         if (ok == LOG_OK)
    3519       11882 :                 ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
    3520       11882 :         if (ok == LOG_OK)
    3521       11882 :                 ok = log_segments(tr, bat->segs, t->base.id);
    3522       11882 :         bat_destroy(b);
    3523       11882 :         return ok;
    3524             : }
    3525             : 
    3526             : static int
    3527       11924 : log_create_del(sql_trans *tr, sql_change *change)
    3528             : {
    3529       11924 :         int ok = LOG_OK;
    3530       11924 :         sql_table *t = (sql_table*)change->obj;
    3531             : 
    3532       11924 :         if (t->base.deleted)
    3533             :                 return ok;
    3534       11914 :         assert(!isTempTable(t));
    3535       11914 :         ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
    3536       11914 :         if (ok == LOG_OK) {
    3537       71532 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3538       59618 :                         sql_column *c = n->data;
    3539             : 
    3540       59618 :                         ok = log_create_col_(tr, c);
    3541             :                 }
    3542       11914 :                 if (t->idxs) {
    3543       17560 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3544        5646 :                                 sql_idx *i = n->data;
    3545             : 
    3546        5646 :                                 if (ATOMIC_PTR_GET(&i->data))
    3547        5634 :                                         ok = log_create_idx_(tr, i);
    3548             :                         }
    3549             :                 }
    3550             :         }
    3551             :         return ok;
    3552             : }
    3553             : 
    3554             : static int
    3555       19834 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3556             : {
    3557       19834 :         int ok = LOG_OK;
    3558       19834 :         sql_table *t = (sql_table*)change->obj;
    3559       19834 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    3560             : 
    3561       19834 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    3562         118 :                 assert(isTempTable(t));
    3563         118 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    3564         118 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    3565         118 :                 return ok;
    3566             :         }
    3567             : 
    3568       19716 :         if (!commit_ts) /* rollback handled by ? */
    3569             :                 return ok;
    3570       18017 :         ok = segments2cs(tr, dbat->segs, &dbat->cs);
    3571       18017 :         assert(ok == LOG_OK);
    3572       18017 :         if (ok != LOG_OK)
    3573             :                 return ok;
    3574       18017 :         merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
    3575       18017 :         assert(dbat->cs.ts == tr->tid);
    3576       18017 :         dbat->cs.ts = commit_ts;
    3577       18017 :         if (ok == LOG_OK) {
    3578      124165 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3579      106148 :                         sql_column *c = n->data;
    3580      106148 :                         sql_delta *delta = ATOMIC_PTR_GET(&c->data);
    3581             : 
    3582      106148 :                         ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
    3583             :                 }
    3584       18017 :                 if (t->idxs) {
    3585       23676 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3586        5659 :                                 sql_idx *i = n->data;
    3587        5659 :                                 sql_delta *delta = ATOMIC_PTR_GET(&i->data);
    3588             : 
    3589        5659 :                                 if (delta)
    3590        5647 :                                         ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
    3591             :                         }
    3592             :                 }
    3593       18017 :                 if (!tr->parent)
    3594       18015 :                         t->base.new = 0;
    3595             :         }
    3596       18017 :         if (!tr->parent)
    3597       18015 :                 t->base.new = 0;
    3598             :         return ok;
    3599             : }
    3600             : 
    3601             : static int
    3602       17876 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
    3603             : {
    3604       17876 :         gdk_return ok = GDK_SUCCEED;
    3605             : 
    3606       17876 :         sqlstore *store = tr->store;
    3607       17876 :         if (!GDKinmemory(0) && b && b->cs.bid)
    3608       17873 :                 ok = log_bat_transient(store->logger, id);
    3609       17876 :         if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
    3610          25 :                 ok = log_bat_transient(store->logger, -id);
    3611       17876 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3612             : }
    3613             : 
    3614             : static int
    3615      164878 : destroy_col(sqlstore *store, sql_column *c)
    3616             : {
    3617      164878 :         (void)store;
    3618      164878 :         if (ATOMIC_PTR_GET(&c->data))
    3619      164878 :                 destroy_delta(ATOMIC_PTR_GET(&c->data), true);
    3620      164878 :         ATOMIC_PTR_SET(&c->data, NULL);
    3621      164878 :         return LOG_OK;
    3622             : }
    3623             : 
    3624             : static int
    3625       16042 : log_destroy_col_(sql_trans *tr, sql_column *c)
    3626             : {
    3627       16042 :         int ok = LOG_OK;
    3628       16042 :         assert(!isTempTable(c->t));
    3629       16042 :         if (!tr->parent) /* don't write save point commits */
    3630       16042 :                 ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
    3631       16042 :         return ok;
    3632             : }
    3633             : 
    3634             : static int
    3635          49 : log_destroy_col(sql_trans *tr, sql_change *change)
    3636             : {
    3637          49 :         sql_column *c = (sql_column*)change->obj;
    3638          49 :         int res = log_destroy_col_(tr, c);
    3639          49 :         change->obj = NULL;
    3640          49 :         column_destroy(tr->store, c);
    3641          49 :         return res;
    3642             : }
    3643             : 
    3644             : static int
    3645       10632 : destroy_idx(sqlstore *store, sql_idx *i)
    3646             : {
    3647       10632 :         (void)store;
    3648       10632 :         if (ATOMIC_PTR_GET(&i->data))
    3649       10632 :                 destroy_delta(ATOMIC_PTR_GET(&i->data), true);
    3650       10632 :         ATOMIC_PTR_SET(&i->data, NULL);
    3651       10632 :         return LOG_OK;
    3652             : }
    3653             : 
    3654             : static int
    3655        1896 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
    3656             : {
    3657        1896 :         int ok = LOG_OK;
    3658        1896 :         assert(!isTempTable(i->t));
    3659        1896 :         if (ATOMIC_PTR_GET(&i->data)) {
    3660        1834 :                 if (!tr->parent) /* don't write save point commits */
    3661        1834 :                         ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
    3662             :         }
    3663        1896 :         return ok;
    3664             : }
    3665             : 
    3666             : static int
    3667         496 : log_destroy_idx(sql_trans *tr, sql_change *change)
    3668             : {
    3669         496 :         sql_idx *i = (sql_idx*)change->obj;
    3670         496 :         int res = log_destroy_idx_(tr, i);
    3671         496 :         change->obj = NULL;
    3672         496 :         idx_destroy(tr->store, i);
    3673         496 :         return res;
    3674             : }
    3675             : 
    3676             : static int
    3677       26099 : destroy_del(sqlstore *store, sql_table *t)
    3678             : {
    3679       26099 :         (void)store;
    3680       26099 :         if (ATOMIC_PTR_GET(&t->data))
    3681       26089 :                 destroy_storage(ATOMIC_PTR_GET(&t->data));
    3682       26099 :         ATOMIC_PTR_SET(&t->data, NULL);
    3683       26099 :         return LOG_OK;
    3684             : }
    3685             : 
    3686             : static int
    3687        2975 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
    3688             : {
    3689        2975 :         gdk_return ok = GDK_SUCCEED;
    3690             : 
    3691        2975 :         sqlstore *store = tr->store;
    3692        2975 :         if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
    3693        2975 :             bat && bat->cs.bid)
    3694        2975 :                 ok = log_bat_transient(store->logger, id);
    3695        2975 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3696             : }
    3697             : 
    3698             : static int
    3699        2975 : log_destroy_del(sql_trans *tr, sql_change *change)
    3700             : {
    3701        2975 :         int ok = LOG_OK;
    3702        2975 :         sql_table *t = (sql_table*)change->obj;
    3703             : 
    3704        2975 :         assert(!isTempTable(t));
    3705        2975 :         ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
    3706        2975 :         if (ok == LOG_OK) {
    3707       18968 :                 for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
    3708       15993 :                         sql_column *c = n->data;
    3709             : 
    3710       15993 :                         ok = log_destroy_col_(tr, c);
    3711             :                 }
    3712        2975 :                 if (t->idxs) {
    3713        4375 :                         for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
    3714        1400 :                                 sql_idx *i = n->data;
    3715             : 
    3716        1400 :                                 ok = log_destroy_idx_(tr, i);
    3717             :                         }
    3718             :                 }
    3719             :         }
    3720        2975 :         return ok;
    3721             : }
    3722             : 
    3723             : static int
    3724        3600 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    3725             : {
    3726        3600 :         (void)tr;
    3727        3600 :         (void)change;
    3728        3600 :         (void)commit_ts;
    3729        3600 :         (void)oldest;
    3730        3600 :         if (commit_ts)
    3731        3586 :                 change->handled = true;
    3732        3600 :         return 0;
    3733             : }
    3734             : 
    3735             : static int
    3736        3047 : drop_del(sql_trans *tr, sql_table *t)
    3737             : {
    3738        3047 :         int ok = LOG_OK;
    3739             : 
    3740        3047 :         if (!isNew(t)) {
    3741        3047 :                 storage *bat = ATOMIC_PTR_GET(&t->data);
    3742        3113 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_destroy_del);
    3743             :         }
    3744        3047 :         return ok;
    3745             : }
    3746             : 
    3747             : static int
    3748          56 : drop_col(sql_trans *tr, sql_column *c)
    3749             : {
    3750          56 :         assert(!isNew(c));
    3751          56 :         sql_delta *d = ATOMIC_PTR_GET(&c->data);
    3752          56 :         trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_destroy_col);
    3753          56 :         return LOG_OK;
    3754             : }
    3755             : 
    3756             : static int
    3757         497 : drop_idx(sql_trans *tr, sql_idx *i)
    3758             : {
    3759         497 :         assert(!isNew(i));
    3760         497 :         sql_delta *d = ATOMIC_PTR_GET(&i->data);
    3761         497 :         trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_destroy_idx);
    3762         497 :         return LOG_OK;
    3763             : }
    3764             : 
    3765             : 
    3766             : static BUN
    3767      129718 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
    3768             : {
    3769      129718 :         BAT *b;
    3770      129718 :         BUN sz = 0;
    3771             : 
    3772      129718 :         (void)tr;
    3773      129718 :         assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
    3774      129718 :         if (cs->bid && renew) {
    3775      129757 :                 b = quick_descriptor(cs->bid);
    3776      129719 :                 if (b) {
    3777      129719 :                         sz += BATcount(b);
    3778      129719 :                         if (cs->st == ST_DICT) {
    3779           2 :                                 bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
    3780           2 :                                 BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
    3781             : 
    3782           2 :                                 if (nebid == BID_NIL || !n) {
    3783           0 :                                         temp_destroy(nebid);
    3784           0 :                                         bat_destroy(n);
    3785           0 :                                         return BUN_NONE;
    3786             :                                 }
    3787           2 :                                 temp_destroy(cs->ebid);
    3788           2 :                                 cs->ebid = nebid;
    3789           2 :                                 if (!temp)
    3790           2 :                                         bat_set_access(n, BAT_READ);
    3791           2 :                                 temp_destroy(cs->bid);
    3792           2 :                                 cs->bid = temp_create(n); /* create empty copy */
    3793           2 :                                 bat_destroy(n);
    3794             :                         } else {
    3795      129717 :                                 bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
    3796             : 
    3797      129453 :                                 if (nbid == BID_NIL)
    3798             :                                         return BUN_NONE;
    3799      129453 :                                 temp_destroy(cs->bid);
    3800      129603 :                                 cs->bid = nbid;
    3801             :                         }
    3802             :                 } else {
    3803             :                         return BUN_NONE;
    3804             :                 }
    3805             :         }
    3806      129566 :         if (cs->uibid) {
    3807      129423 :                 temp_destroy(cs->uibid);
    3808      129642 :                 cs->uibid = 0;
    3809             :         }
    3810      129785 :         if (cs->uvbid) {
    3811      129643 :                 temp_destroy(cs->uvbid);
    3812      129646 :                 cs->uvbid = 0;
    3813             :         }
    3814      129788 :         cs->cleared = true;
    3815      129788 :         cs->ucnt = 0;
    3816      129788 :         return sz;
    3817             : }
    3818             : 
    3819             : static BUN
    3820      129599 : clear_col(sql_trans *tr, sql_column *c, bool renew)
    3821             : {
    3822      129599 :         bool update_conflict = false;
    3823      129599 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
    3824             : 
    3825      129599 :         if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
    3826           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3827      129639 :         assert(c->t->persistence != SQL_DECLARED_TABLE);
    3828      129639 :         if (odelta != delta)
    3829      129644 :                 trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
    3830      129624 :         if (delta)
    3831      129624 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
    3832             :         return 0;
    3833             : }
    3834             : 
    3835             : static BUN
    3836          21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
    3837             : {
    3838          21 :         bool update_conflict = false;
    3839          21 :         sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
    3840             : 
    3841          21 :         if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    3842          15 :                 return 0;
    3843           6 :         if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
    3844           0 :                 return update_conflict ? BUN_NONE - 1 : BUN_NONE;
    3845           6 :         assert(i->t->persistence != SQL_DECLARED_TABLE);
    3846           6 :         if (odelta != delta)
    3847           6 :                 trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
    3848           6 :         if (delta)
    3849           6 :                 return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
    3850             :         return 0;
    3851             : }
    3852             : 
    3853             : static int
    3854         140 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
    3855             : {
    3856         140 :         if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
    3857             :                 return LOG_ERR;
    3858         140 :         if (s->segs)
    3859         140 :                 destroy_segments(s->segs);
    3860         140 :         if (!(s->segs = new_segments(tr, 0)))
    3861             :                 return LOG_ERR;
    3862             :         return LOG_OK;
    3863             : }
    3864             : 
    3865             : 
    3866             : /*
    3867             :  * Clear the table, in general this means replacing the storage,
    3868             :  * but in case of earlier deletes (or inserts by this transaction), we only mark
    3869             :  * all segments as deleted.
    3870             :  * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
    3871             :  */
    3872             : static BUN
    3873       41823 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
    3874             : {
    3875       41823 :         int clear = !in_transaction, ok = LOG_OK;
    3876       41823 :         bool conflict = false;
    3877       41823 :         storage *bat;
    3878             : 
    3879       41874 :         if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
    3880       15762 :                 return conflict?BUN_NONE-1:BUN_NONE;
    3881             : 
    3882       26060 :         if (!clear) {
    3883          51 :                 lock_table(tr->store, t->base.id);
    3884          51 :                 ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
    3885          51 :                 unlock_table(tr->store, t->base.id);
    3886             :         }
    3887       26060 :         assert(t->persistence != SQL_DECLARED_TABLE);
    3888       26060 :         if (!in_transaction)
    3889       26012 :                 trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    3890       26058 :         if (ok == LOG_ERR)
    3891             :                 return BUN_NONE;
    3892       26058 :         if (ok == LOG_CONFLICT)
    3893           0 :                 return BUN_NONE - 1;
    3894             :         return LOG_OK;
    3895             : }
    3896             : 
    3897             : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
    3898             : static BUN
    3899       41802 : clear_table(sql_trans *tr, sql_table *t)
    3900             : {
    3901       41802 :         node *n = ol_first_node(t->columns);
    3902       41802 :         sql_column *c = n->data;
    3903       41802 :         storage *d = tab_timestamp_storage(tr, t);
    3904       41820 :         int in_transaction, clear;
    3905       41820 :         BUN sz, clear_ok;
    3906             : 
    3907       41820 :         if (!d)
    3908             :                 return BUN_NONE;
    3909       41820 :         in_transaction = segments_in_transaction(tr, t);
    3910       41820 :         clear = !in_transaction;
    3911       41820 :         sz = count_col(tr, c, CNT_ACTIVE);
    3912       41823 :         if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
    3913             :                 return clear_ok;
    3914             : 
    3915       26057 :         if (in_transaction)
    3916             :                 return sz;
    3917             : 
    3918      155645 :         for (; n; n = n->next) {
    3919      129636 :                 c = n->data;
    3920             : 
    3921      129636 :                 if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
    3922           0 :                         return clear_ok;
    3923             :         }
    3924       26009 :         if (t->idxs) {
    3925       26030 :                 for (n = ol_first_node(t->idxs); n; n = n->next) {
    3926          21 :                         sql_idx *ci = n->data;
    3927             : 
    3928          21 :                         if (isTable(ci->t) && idx_has_column(ci->type) &&
    3929          21 :                                 (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
    3930           0 :                                 return clear_ok;
    3931             :                 }
    3932             :         }
    3933             :         return sz;
    3934             : }
    3935             : 
    3936             : static int
    3937      158997 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
    3938             : {
    3939      158997 :         sqlstore *store = tr->store;
    3940      158997 :         gdk_return ok = GDK_SUCCEED;
    3941             : 
    3942      158997 :         (void) t;
    3943      158997 :         (void) segs;
    3944      158997 :         if (GDKinmemory(0))
    3945             :                 return LOG_OK;
    3946             : 
    3947      158990 :         if (cs->cleared) {
    3948      155702 :                 assert(cs->ucnt == 0);
    3949      155702 :                 BAT *ins = temp_descriptor(cs->bid);
    3950      155702 :                 if (!ins)
    3951             :                         return LOG_ERR;
    3952      155702 :                 assert(!isEbat(ins));
    3953      155702 :                 bat_set_access(ins, BAT_READ);
    3954      155702 :                 ok = log_bat_persists(store->logger, ins, id);
    3955      155702 :                 bat_destroy(ins);
    3956      155702 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    3957          56 :                         BAT *ins = temp_descriptor(cs->ebid);
    3958          56 :                         if (!ins)
    3959             :                                 return LOG_ERR;
    3960          56 :                         assert(!isEbat(ins));
    3961          56 :                         bat_set_access(ins, BAT_READ);
    3962          56 :                         ok = log_bat_persists(store->logger, ins, -id);
    3963          56 :                         bat_destroy(ins);
    3964             :                 }
    3965      155702 :                 return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3966             :         }
    3967             : 
    3968        3288 :         assert(!isTempTable(t));
    3969             : 
    3970        3288 :         if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
    3971        2717 :                 BAT *ui = temp_descriptor(cs->uibid);
    3972        2717 :                 BAT *uv = temp_descriptor(cs->uvbid);
    3973             :                 /* any updates */
    3974        2717 :                 if (ui == NULL || uv == NULL) {
    3975             :                         ok = GDK_FAIL;
    3976        2717 :                 } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
    3977        2717 :                         ok = log_delta(store->logger, ui, uv, id);
    3978        2717 :                 bat_destroy(ui);
    3979        2717 :                 bat_destroy(uv);
    3980             :         }
    3981        2717 :         return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
    3982             : }
    3983             : 
    3984             : static inline int
    3985       56215 : tr_log_table_start(sql_trans *tr, sql_table *t) {
    3986       56215 :         sqlstore *store = tr->store;
    3987       56215 :         return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3988             : }
    3989             : 
    3990             : static inline int
    3991       56215 : tr_log_table_end(sql_trans *tr, sql_table *t) {
    3992       56215 :         sqlstore *store = tr->store;
    3993       56215 :         return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
    3994             : }
    3995             : 
    3996             : static int
    3997       56215 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
    3998             : {
    3999       56215 :         sqlstore *store = tr->store;
    4000       56215 :         gdk_return ok = GDK_SUCCEED;
    4001             : 
    4002       56215 :         size_t end = segs_end(segs, tr, t);
    4003             : 
    4004       56215 :         if (tr_log_table_start(tr, t) != LOG_OK)
    4005             :                 return LOG_ERR;
    4006             : 
    4007       56215 :         size_t nr_appends = 0;
    4008             : 
    4009       56215 :         lock_table(tr->store, t->base.id);
    4010      374106 :         for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
    4011      317891 :                 unlock_table(tr->store, t->base.id);
    4012             : 
    4013      317891 :                 if (seg->ts == tr->tid && seg->end-seg->start) {
    4014      114183 :                         if (!seg->deleted) {
    4015       45743 :                                 if (log_segment(tr, seg, t->base.id) != LOG_OK)
    4016             :                                         return LOG_ERR;
    4017             : 
    4018       45743 :                                 nr_appends += (seg->end - seg->start);
    4019             :                         }
    4020             :                 }
    4021      317891 :                 lock_table(tr->store, t->base.id);
    4022             :         }
    4023       56215 :         unlock_table(tr->store, t->base.id);
    4024             : 
    4025      385212 :         for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
    4026      328997 :                 sql_column *c = n->data;
    4027      328997 :                 column_storage *cs = ATOMIC_PTR_GET(&c->data);
    4028             : 
    4029      328997 :                 if (cs->cleared) {
    4030           3 :                         ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4031           3 :                         continue;
    4032             :                 }
    4033             : 
    4034      328994 :                 lock_table(tr->store, t->base.id);
    4035      328994 :                 if (!cs->cleared) {
    4036     2192236 :                         for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4037     1863242 :                                 unlock_table(tr->store, t->base.id);
    4038     1863242 :                                 if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4039             :                                         /* append col*/
    4040      255359 :                                         BAT *ins = temp_descriptor(cs->bid);
    4041      255359 :                                         if (ins == NULL)
    4042             :                                                 return LOG_ERR;
    4043      255359 :                                         assert(BATcount(ins) >= cur->end);
    4044      255359 :                                         ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
    4045      255359 :                                         bat_destroy(ins);
    4046             :                                 }
    4047     1863242 :                                 lock_table(tr->store, t->base.id);
    4048             :                         }
    4049             :                 }
    4050      328994 :                 unlock_table(tr->store, t->base.id);
    4051             : 
    4052      328994 :                 if (ok == GDK_SUCCEED && cs->ebid) {
    4053          19 :                         BAT *ins = temp_descriptor(cs->ebid);
    4054          19 :                         if (ins == NULL)
    4055             :                                 return LOG_ERR;
    4056          19 :                         if (BATcount(ins) > ins->batInserted)
    4057          17 :                                 ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
    4058          19 :                         BATcommit(ins, BATcount(ins));
    4059          19 :                         bat_destroy(ins);
    4060             :                 }
    4061             :         }
    4062             : 
    4063       56215 :         if (t->idxs) {
    4064       61387 :                 for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
    4065        5172 :                         sql_idx *i = n->data;
    4066             : 
    4067        5172 :                         if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
    4068        4399 :                                 continue;
    4069         773 :                         column_storage *cs = ATOMIC_PTR_GET(&i->data);
    4070             : 
    4071         773 :                         if (cs) {
    4072         773 :                                 if (cs->cleared) {
    4073           0 :                                         ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
    4074           0 :                                         continue;
    4075             :                                 }
    4076             : 
    4077         773 :                                 lock_table(tr->store, t->base.id);
    4078        2372 :                                 for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
    4079        1599 :                                         unlock_table(tr->store, t->base.id);
    4080        1599 :                                         if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
    4081             :                                                 /* append idx */
    4082         725 :                                                 BAT *ins = temp_descriptor(cs->bid);
    4083         725 :                                                 if (ins == NULL)
    4084             :                                                         return LOG_ERR;
    4085         725 :                                                 assert(BATcount(ins) >= cur->end);
    4086         725 :                                                 ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
    4087         725 :                                                 bat_destroy(ins);
    4088             :                                         }
    4089        1599 :                                         lock_table(tr->store, t->base.id);
    4090             :                                 }
    4091         773 :                                 unlock_table(tr->store, t->base.id);
    4092             :                         }
    4093             :                 }
    4094             :         }
    4095             : 
    4096       56215 :         if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
    4097           0 :                 return LOG_ERR;
    4098             : 
    4099             :         return LOG_OK;
    4100             : }
    4101             : 
    4102             : static int
    4103       82218 : log_storage(sql_trans *tr, sql_table *t, storage *s)
    4104             : {
    4105       82218 :         int ok = LOG_OK;
    4106       82218 :         bool cleared = s->cs.cleared;
    4107       82218 :         if (ok == LOG_OK && cleared)
    4108       26003 :                 ok =  tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
    4109       26003 :         if (ok == LOG_OK)
    4110       82218 :                 ok = log_segments(tr, s->segs, t->base.id);
    4111       82218 :         if (ok == LOG_OK && !cleared)
    4112       56215 :                 ok = log_table_append(tr, t, s->segs);
    4113       82218 :         return ok;
    4114             : }
    4115             : 
    4116             : static void
    4117      309982 : merge_cs( column_storage *cs, const char* caller)
    4118             : {
    4119      309982 :         if (cs->bid && cs->ucnt) {
    4120        2725 :                 BAT *cur = temp_descriptor(cs->bid);
    4121        2725 :                 BAT *ui = temp_descriptor(cs->uibid);
    4122        2725 :                 BAT *uv = temp_descriptor(cs->uvbid);
    4123             : 
    4124        2725 :                 if (!cur || !ui || !uv) {
    4125           0 :                         bat_destroy(ui);
    4126           0 :                         bat_destroy(uv);
    4127           0 :                         bat_destroy(cur);
    4128           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4129             :                         return;
    4130             :                 }
    4131        2725 :                 assert(BATcount(ui) == BATcount(uv));
    4132             : 
    4133             :                 /* any updates */
    4134        2725 :                 assert(!isEbat(cur));
    4135        2725 :                 if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
    4136           0 :                         bat_destroy(ui);
    4137           0 :                         bat_destroy(uv);
    4138           0 :                         bat_destroy(cur);
    4139           0 :                         GDKfatal(FATAL_MERGE_FAILURE, caller);
    4140             :                         return;
    4141             :                 }
    4142             :                 /* cleanup the old deltas */
    4143        2725 :                 temp_destroy(cs->uibid);
    4144        2725 :                 temp_destroy(cs->uvbid);
    4145        2725 :                 cs->uibid = e_bat(TYPE_oid);
    4146        2725 :                 cs->uvbid = e_bat(cur->ttype);
    4147        2725 :                 assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
    4148        2725 :                 cs->ucnt = 0;
    4149        2725 :                 bat_destroy(ui);
    4150        2725 :                 bat_destroy(uv);
    4151        2725 :                 bat_destroy(cur);
    4152             :         }
    4153      309982 :         cs->cleared = false;
    4154      309982 :         cs->merged = true;
    4155      309982 :         return;
    4156             : }
    4157             : 
    4158             : static void
    4159      269928 : merge_delta( sql_delta *obat)
    4160             : {
    4161      269928 :         if (obat && obat->next && !obat->cs.merged)
    4162       28958 :                 merge_delta(obat->next);
    4163      269928 :         merge_cs(&obat->cs, __func__);
    4164      269928 : }
    4165             : 
    4166             : static void
    4167       40054 : merge_storage(storage *tdb)
    4168             : {
    4169       40054 :         merge_cs(&tdb->cs, __func__);
    4170             : 
    4171       40054 :         if (tdb->next) {
    4172         275 :                 destroy_storage(tdb->next);
    4173         275 :                 tdb->next = NULL;
    4174             :         }
    4175       40054 : }
    4176             : 
    4177             : static sql_delta *
    4178           1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
    4179             : {
    4180             :         /* commit ie copy back to the parent transaction */
    4181           1 :         if (delta && delta->cs.ts == commit_ts && delta->next) {
    4182           1 :                 sql_delta *od = delta->next;
    4183           1 :                 if (od->cs.ts == commit_ts) {
    4184           0 :                         sql_delta t = *od, *n = od->next;
    4185           0 :                         *od = *delta;
    4186           0 :                         od->next = n;
    4187           0 :                         *delta = t;
    4188           0 :                         delta->next = NULL;
    4189           0 :                         destroy_delta(delta, true);
    4190           0 :                         return od;
    4191             :                 }
    4192             :         }
    4193             :         return delta;
    4194             : }
    4195             : 
    4196             : static int
    4197      132965 : log_update_col( sql_trans *tr, sql_change *change)
    4198             : {
    4199      132965 :         sql_column *c = (sql_column*)change->obj;
    4200      132965 :         assert(!isTempTable(c->t));
    4201             : 
    4202      132965 :         if (isDeleted(c->t)) {
    4203           0 :                 change->handled = true;
    4204           0 :                 return LOG_OK;
    4205             :         }
    4206             : 
    4207      132965 :         if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
    4208      132965 :                 storage *s = ATOMIC_PTR_GET(&c->t->data);
    4209      132965 :                 sql_delta *d = ATOMIC_PTR_GET(&c->data);
    4210      132965 :                 return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
    4211             :         }
    4212             :         return LOG_OK;
    4213             : }
    4214             : 
    4215             : static int
    4216         207 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
    4217             : {
    4218         207 :         sqlstore *store = Store;
    4219             : 
    4220         207 :         sql_delta *d = (sql_delta*)change->data;
    4221         207 :         if (d->cs.ts < oldest) {
    4222          78 :                 destroy_delta(d, false);
    4223          78 :                 if (change->commit == &commit_update_idx)
    4224           2 :                         table_destroy(store, ((sql_idx*)change->obj)->t);
    4225             :                 else
    4226          76 :                         table_destroy(store, ((sql_column*)change->obj)->t);
    4227          78 :                 return 1;
    4228             :         }
    4229         129 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4230          78 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4231             :         return 0;
    4232             : }
    4233             : 
    4234             : static int
    4235           9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
    4236             : {
    4237           9 :         sqlstore *store = Store;
    4238             : 
    4239           9 :         storage *d = (storage*)change->data;
    4240           9 :         if (d->cs.ts < oldest) {
    4241           3 :                 destroy_storage(d);
    4242           3 :                 table_destroy(store, (sql_table*)change->obj);
    4243           3 :                 return 1;
    4244             :         }
    4245           6 :         if (d->cs.ts > TRANSACTION_ID_BASE)
    4246           3 :                 d->cs.ts = store_get_timestamp(store) + 1;
    4247             :         return 0;
    4248             : }
    4249             : 
    4250             : static int
    4251      133079 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
    4252             : {
    4253      133079 :         (void) type; // TODO transaction_layer_revamp remove if remains unused
    4254             : 
    4255      133079 :         sql_delta *delta = ATOMIC_PTR_GET(data);
    4256             : 
    4257      133079 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4258           3 :                 int ok = LOG_OK;
    4259           3 :                 assert(isTempTable(t));
    4260           3 :                 if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
    4261           0 :                         ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
    4262           3 :                 if (!tr->parent)
    4263           0 :                         t->base.new = base->new = 0;
    4264           3 :                 change->handled = true;
    4265           3 :                 return ok;
    4266             :         }
    4267             : 
    4268      133076 :         if (commit_ts)
    4269      132998 :                 delta->cs.ts = commit_ts;
    4270      133076 :         if (!commit_ts) { /* rollback */
    4271          78 :                 sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
    4272             : 
    4273          78 :                 if (change->ts && t->base.new) /* handled by create col */
    4274             :                         return LOG_OK;
    4275          78 :                 if (o != d) {
    4276           0 :                         while(o && o->next != d)
    4277             :                                 o = o->next;
    4278             :                 }
    4279          78 :                 if (o == ATOMIC_PTR_GET(data))
    4280          78 :                         ATOMIC_PTR_SET(data, d->next);
    4281             :                 else
    4282           0 :                         o->next = d->next;
    4283          78 :                 d->next = NULL;
    4284          78 :                 change->cleanup = &tc_gc_rollbacked;
    4285      132998 :         } else if (!tr->parent) {
    4286             :                 /* merge deltas */
    4287      274539 :                 while (delta && delta->cs.ts > oldest)
    4288      141542 :                         delta = delta->next;
    4289      132997 :                 if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
    4290       25329 :                         lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
    4291       25329 :                         merge_delta(delta);
    4292       25329 :                         unlock_column(tr->store, base->id);
    4293             :                 }
    4294           1 :         } else if (tr->parent) /* move delta into older and cleanup current save points */
    4295           1 :                 ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
    4296             :         return LOG_OK;
    4297             : }
    4298             : 
    4299             : static int
    4300      133051 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4301             : {
    4302             : 
    4303      133051 :         sql_column *c = (sql_column*)change->obj;
    4304      133051 :         sql_base* base = &c->base;
    4305      133051 :         sql_table* t = c->t;
    4306      133051 :         ATOMIC_PTR_TYPE* data = &c->data;
    4307      133051 :         int type = c->type.type->localtype;
    4308             : 
    4309      133051 :         if (change->handled || isDeleted(c->t))
    4310             :                 return LOG_OK;
    4311             : 
    4312      133051 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4313             : }
    4314             : 
    4315             : static int
    4316          26 : log_update_idx( sql_trans *tr, sql_change *change)
    4317             : {
    4318          26 :         sql_idx *i = (sql_idx*)change->obj;
    4319          26 :         assert(!isTempTable(i->t));
    4320             : 
    4321          26 :         if (isDeleted(i->t)) {
    4322           0 :                 change->handled = true;
    4323           0 :                 return LOG_OK;
    4324             :         }
    4325             : 
    4326          26 :         if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
    4327          26 :                 storage *s = ATOMIC_PTR_GET(&i->t->data);
    4328          26 :                 sql_delta *d = ATOMIC_PTR_GET(&i->data);
    4329          26 :                 return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
    4330             :         }
    4331             :         return LOG_OK;
    4332             : }
    4333             : 
    4334             : static int
    4335          28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4336             : {
    4337          28 :         sql_idx *i = (sql_idx*)change->obj;
    4338          28 :         sql_base* base = &i->base;
    4339          28 :         sql_table* t = i->t;
    4340          28 :         ATOMIC_PTR_TYPE* data = &i->data;
    4341          28 :         int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
    4342             : 
    4343          28 :         if (change->handled || isDeleted(i->t))
    4344             :                 return LOG_OK;
    4345             : 
    4346          28 :         return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
    4347             : }
    4348             : 
    4349             : static storage *
    4350          26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
    4351             : {
    4352          26 :         if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
    4353           0 :                 storage *od = dbat->next;
    4354           0 :                 if (od->cs.ts == commit_ts) {
    4355           0 :                         storage t = *od, *n = od->next;
    4356           0 :                         *od = *dbat;
    4357           0 :                         od->next = n;
    4358           0 :                         *dbat = t;
    4359           0 :                         dbat->next = NULL;
    4360           0 :                         destroy_storage(dbat);
    4361           0 :                         return od;
    4362             :                 }
    4363             :         }
    4364             :         return dbat;
    4365             : }
    4366             : 
    4367             : static int
    4368       82218 : log_update_del( sql_trans *tr, sql_change *change)
    4369             : {
    4370       82218 :         sql_table *t = (sql_table*)change->obj;
    4371       82218 :         assert(!isTempTable(t));
    4372             : 
    4373       82218 :         if (isDeleted(t)) {
    4374           0 :                 change->handled = true;
    4375           0 :                 return LOG_OK;
    4376             :         }
    4377             : 
    4378       82218 :         if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
    4379       82218 :                 return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
    4380             :         return LOG_OK;
    4381             : }
    4382             : 
    4383             : static int
    4384       86028 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
    4385             : {
    4386       86028 :         int ok = LOG_OK;
    4387       86028 :         sql_table *t = (sql_table*)change->obj;
    4388       86028 :         storage *dbat = ATOMIC_PTR_GET(&t->data);
    4389             : 
    4390       86028 :         if (change->handled || isDeleted(t))
    4391             :                 return ok;
    4392             : 
    4393       86028 :         if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
    4394          22 :                 assert(isTempTable(t));
    4395          22 :                 if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
    4396          22 :                         if (commit_ts) dbat->segs->h->ts = commit_ts;
    4397          22 :                 change->handled = true;
    4398          22 :                 return ok;
    4399             :         }
    4400             : 
    4401       86006 :         lock_table(tr->store, t->base.id);
    4402       86006 :         if (!commit_ts) { /* rollback */
    4403        3491 :                 if (dbat->cs.ts == tr->tid) {
    4404           6 :                         if (change->ts && t->base.new) { /* handled by the create table */
    4405           3 :                                 unlock_table(tr->store, t->base.id);
    4406           3 :                                 return ok;
    4407             :                         }
    4408           3 :                         storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
    4409             : 
    4410           3 :                         if (o != d) {
    4411           0 :                                 while(o && o->next != d)
    4412             :                                         o = o->next;
    4413             :                         }
    4414           3 :                         if (o == ATOMIC_PTR_GET(&t->data)) {
    4415           3 :                                 assert(d->next);
    4416           3 :                                 ATOMIC_PTR_SET(&t->data, d->next);
    4417             :                         } else
    4418           0 :                                 o->next = d->next;
    4419           3 :                         d->next = NULL;
    4420           3 :                         change->cleanup = &tc_gc_rollbacked_storage;
    4421             :                 } else
    4422        3485 :                         rollback_segments(dbat->segs, tr, change, oldest);
    4423       82515 :         } else if (ok == LOG_OK && !tr->parent) {
    4424       82489 :                 if (dbat->cs.ts == tr->tid) /* cleared table */
    4425       26005 :                         dbat->cs.ts = commit_ts;
    4426             : 
    4427       82489 :                 ok = segments2cs(tr, dbat->segs, &dbat->cs);
    4428       82489 :                 if (ok == LOG_OK) {
    4429       82489 :                         merge_segments(dbat, tr, change, commit_ts, oldest);
    4430       82489 :                         if (oldest == commit_ts)
    4431       40054 :                                 merge_storage(dbat);
    4432             :                 }
    4433       82489 :                 if (dbat)
    4434       82489 :                         dbat->cs.cleared = false;
    4435          26 :         } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
    4436          26 :                 merge_segments(dbat, tr, change, commit_ts, oldest);
    4437          26 :                 ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
    4438          26 :                 storage *s = change->data;
    4439          26 :                 if (s->cs.ts == tr->tid)
    4440           0 :                         s->cs.ts = commit_ts;
    4441             :         }
    4442       86003 :         unlock_table(tr->store, t->base.id);
    4443       86003 :         return ok;
    4444             : }
    4445             : 
    4446             : /* only rollback (content version) case for now */
    4447             : static int
    4448         194 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
    4449             : {
    4450         194 :         sqlstore *store = Store;
    4451         194 :         sql_column *c = (sql_column*)change->obj;
    4452             : 
    4453         194 :         if (!c) /* cleaned earlier */
    4454             :                 return 1;
    4455             : 
    4456         145 :         if (change->handled || isDeleted(c->t)) {
    4457           0 :                 column_destroy(store, c);
    4458           0 :                 return 1;
    4459             :         }
    4460             : 
    4461             :         /* savepoint commit (did it merge ?) */
    4462         145 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4463           0 :                 column_destroy(store, c);
    4464           0 :                 return 1;
    4465             :         }
    4466         145 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4467             :                 return 0;
    4468         144 :         sql_delta *d = (sql_delta*)change->data;
    4469         144 :         if (d && d->next) {
    4470             : 
    4471          62 :                 if (d->cs.ts > oldest)
    4472             :                         return LOG_OK; /* cannot cleanup yet */
    4473             : 
    4474             :                 // d is oldest reachable delta
    4475          59 :                 if (d->next) // Unreachable can immediately be destroyed.
    4476          59 :                         destroy_delta(d->next, true);
    4477             : 
    4478          59 :                 d->next = NULL;
    4479          59 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4480          59 :                 merge_delta(d);
    4481          59 :                 unlock_column(store, c->base.id);
    4482             :         }
    4483         141 :         column_destroy(store, c);
    4484         141 :         return 1;
    4485             : }
    4486             : 
    4487             : static int
    4488      788563 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
    4489             : {
    4490      788563 :         sqlstore *store = Store;
    4491      788563 :         sql_column *c = (sql_column*)change->obj;
    4492             : 
    4493      788563 :         if (!c) /* cleaned earlier */
    4494             :                 return 1;
    4495             : 
    4496      788563 :         if (change->handled || isDeleted(c->t)) {
    4497           3 :                 table_destroy(store, c->t);
    4498           3 :                 return 1;
    4499             :         }
    4500             : 
    4501             :         /* savepoint commit (did it merge ?) */
    4502      788560 :         if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
    4503       29845 :                 table_destroy(store, c->t);
    4504       29845 :                 return 1;
    4505             :         }
    4506      758715 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4507             :                 return 0;
    4508      758714 :         sql_delta *d = (sql_delta*)change->data;
    4509      758714 :         if (d && d->next) {
    4510             : 
    4511      758714 :                 if (d->cs.ts > oldest)
    4512             :                         return LOG_OK; /* cannot cleanup yet */
    4513             : 
    4514             :                 // d is oldest reachable delta
    4515      103067 :                 if (d->next) // Unreachable can immediately be destroyed.
    4516      103067 :                         destroy_delta(d->next, true);
    4517             : 
    4518      103067 :                 d->next = NULL;
    4519      103067 :                 lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
    4520      103067 :                 merge_delta(d);
    4521      103067 :                 unlock_column(store, c->base.id);
    4522             :         }
    4523      103067 :         table_destroy(store, c->t);
    4524      103067 :         return 1;
    4525             : }
    4526             : 
    4527             : static int
    4528        1116 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
    4529             : {
    4530        1116 :         sqlstore *store = Store;
    4531        1116 :         sql_idx *i = (sql_idx*)change->obj;
    4532             : 
    4533        1116 :         if (!i) /* cleaned earlier */
    4534             :                 return 1;
    4535             : 
    4536         620 :         if (change->handled || isDeleted(i->t)) {
    4537           0 :                 idx_destroy(store, i);
    4538           0 :                 return 1;
    4539             :         }
    4540             : 
    4541             :         /* savepoint commit (did it merge ?) */
    4542         620 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4543           0 :                 idx_destroy(store, i);
    4544           0 :                 return 1;
    4545             :         }
    4546         620 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4547             :                 return 0;
    4548         620 :         sql_delta *d = (sql_delta*)change->data;
    4549         620 :         if (d->next) {
    4550             : 
    4551           0 :                 if (d->cs.ts > oldest)
    4552             :                         return LOG_OK; /* cannot cleanup yet */
    4553             : 
    4554             :                 // d is oldest reachable delta
    4555           0 :                 if (d->next) // Unreachable can immediately be destroyed.
    4556           0 :                         destroy_delta(d->next, true);
    4557             : 
    4558           0 :                 d->next = NULL;
    4559           0 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4560           0 :                 merge_delta(d);
    4561           0 :                 unlock_column(store, i->base.id);
    4562             :         }
    4563         620 :         idx_destroy(store, i);
    4564         620 :         return 1;
    4565             : }
    4566             : 
    4567             : static int
    4568          26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
    4569             : {
    4570          26 :         sqlstore *store = Store;
    4571          26 :         sql_idx *i = (sql_idx*)change->obj;
    4572             : 
    4573          26 :         if (!i) /* cleaned earlier */
    4574             :                 return 1;
    4575             : 
    4576          26 :         if (change->handled || isDeleted(i->t)) {
    4577           0 :                 table_destroy(store, i->t);
    4578           0 :                 return 1;
    4579             :         }
    4580             : 
    4581             :         /* savepoint commit (did it merge ?) */
    4582          26 :         if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
    4583           0 :                 table_destroy(store, i->t);
    4584           0 :                 return 1;
    4585             :         }
    4586          26 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4587             :                 return 0;
    4588          26 :         sql_delta *d = (sql_delta*)change->data;
    4589          26 :         if (d->next) {
    4590             : 
    4591          26 :                 if (d->cs.ts > oldest)
    4592             :                         return LOG_OK; /* cannot cleanup yet */
    4593             : 
    4594             :                 // d is oldest reachable delta
    4595          26 :                 if (d->next) // Unreachable can immediately be destroyed.
    4596          26 :                         destroy_delta(d->next, true);
    4597             : 
    4598          26 :                 d->next = NULL;
    4599          26 :                 lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
    4600          26 :                 merge_delta(d);
    4601          26 :                 unlock_column(store, i->base.id);
    4602             :         }
    4603          26 :         table_destroy(store, i->t);
    4604          26 :         return 1;
    4605             : }
    4606             : 
    4607             : static int
    4608      226993 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
    4609             : {
    4610      226993 :         sqlstore *store = Store;
    4611      226993 :         sql_table *t = (sql_table*)change->obj;
    4612             : 
    4613      226993 :         if (change->handled || isDeleted(t)) {
    4614        3213 :                 table_destroy(store, t);
    4615        3213 :                 return 1;
    4616             :         }
    4617             :         /* savepoint commit (did it merge ?) */
    4618      223780 :         if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
    4619        6061 :                 table_destroy(store, t);
    4620        6061 :                 return 1;
    4621             :         }
    4622      217719 :         if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
    4623             :                 return 0;
    4624      217691 :         storage *d = (storage*)change->data;
    4625      217691 :         if (d->next) {
    4626      150206 :                 if (d->cs.ts > oldest)
    4627             :                         return LOG_OK; /* cannot cleanup yet */
    4628             : 
    4629       19670 :                 destroy_storage(d->next);
    4630       19670 :                 d->next = NULL;
    4631             :         }
    4632       87155 :         table_destroy(store, t);
    4633       87155 :         return 1;
    4634             : }
    4635             : 
    4636             : static int
    4637       30098 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
    4638             : {
    4639       30098 :         if (nr == 0)
    4640             :                 return LOG_OK;
    4641       30098 :         assert (nr > 0);
    4642       30098 :         if ((!offsets || !*offsets) && nr == total) {
    4643       30079 :                 *offset = slot;
    4644       30079 :                 return LOG_OK;
    4645             :         }
    4646          19 :         if (!*offsets) {
    4647           7 :                 *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
    4648           7 :                 if (!*offsets)
    4649             :                         return LOG_ERR;
    4650             :         }
    4651          19 :         oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
    4652       16158 :         for(size_t i = 0; i < nr; i++)
    4653       16139 :                 dst[i] = slot + i;
    4654          19 :         (*offsets)->batCount += nr;
    4655          19 :         (*offsets)->theap->dirty = true;
    4656          19 :         return LOG_OK;
    4657             : }
    4658             : 
    4659             : static int
    4660       29918 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4661             : {
    4662       29918 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4663       30235 :         assert(s->segs);
    4664       30235 :         ulng oldest = store_oldest(tr->store, NULL);
    4665       29965 :         BUN slot = 0;
    4666       29965 :         size_t total = cnt;
    4667             : 
    4668       29965 :         if (!locked)
    4669       29967 :                 lock_table(tr->store, t->base.id);
    4670             :         /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
    4671       60421 :         for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4672       30308 :                 if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* re-use old deleted or rolledback append */
    4673          35 :                         if ((seg->end - seg->start) >= cnt) {
    4674             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4675          13 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4676           2 :                                         slot = p->end;
    4677           2 :                                         p->end += cnt;
    4678           2 :                                         seg->start += cnt;
    4679           2 :                                         if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
    4680             :                                                 ok = LOG_ERR;
    4681             :                                                 break;
    4682             :                                         }
    4683           2 :                                         cnt = 0;
    4684           2 :                                         break;
    4685             :                                 }
    4686             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4687          11 :                                 size_t rcnt = seg->end - seg->start;
    4688          11 :                                 if (rcnt > cnt)
    4689             :                                         rcnt = cnt;
    4690          11 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
    4691             :                                         ok = LOG_ERR;
    4692             :                                         break;
    4693             :                                 }
    4694             :                         }
    4695          33 :                         seg->ts = tr->tid;
    4696          33 :                         seg->deleted = false;
    4697          33 :                         slot = seg->start;
    4698          33 :                         if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
    4699             :                                 ok = LOG_ERR;
    4700             :                                 break;
    4701             :                         }
    4702           0 :                         cnt -= (seg->end - seg->start);
    4703             :                 }
    4704             :         }
    4705       30115 :         if (ok == LOG_OK && cnt) {
    4706       30102 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4707       29158 :                         slot = s->segs->t->end;
    4708       29158 :                         s->segs->t->end += cnt;
    4709             :                 } else {
    4710         944 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4711             :                                 ok = LOG_ERR;
    4712             :                         } else {
    4713        1059 :                                 if (!s->segs->h)
    4714           0 :                                         s->segs->h = s->segs->t;
    4715        1059 :                                 slot = s->segs->t->start;
    4716             :                         }
    4717             :                 }
    4718       30217 :                 if (ok == LOG_OK)
    4719       30217 :                         ok = add_offsets(slot, cnt, total, offset, offsets);
    4720             :         }
    4721       30107 :         if (!locked)
    4722       30033 :                 unlock_table(tr->store, t->base.id);
    4723             : 
    4724       30327 :         if (ok == LOG_OK) {
    4725             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4726       30256 :                 if (!in_transaction) {
    4727        1044 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4728        1044 :                         in_transaction = true;
    4729             :                 }
    4730       30256 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4731       30241 :                         tr->logchanges += (int) total;
    4732       30252 :                 if (*offsets) {
    4733           6 :                         BAT *pos = *offsets;
    4734           6 :                         assert(BATcount(pos) == total);
    4735           6 :                         BATsetcount(pos, total); /* set other properties */
    4736           7 :                         pos->tnil = false;
    4737           7 :                         pos->tnonil = true;
    4738           7 :                         pos->tkey = true;
    4739           7 :                         pos->tsorted = true;
    4740           7 :                         pos->trevsorted = false;
    4741             :                 }
    4742             :         }
    4743       30324 :         return ok;
    4744             : }
    4745             : 
    4746             : static int
    4747     2006351 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
    4748             : {
    4749     2006351 :         if (cnt > 1 && offsets)
    4750       29917 :                 return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
    4751     1976434 :         int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
    4752     1976443 :         assert(s->segs);
    4753     1976443 :         ulng oldest = store_oldest(tr->store, NULL);
    4754     1976436 :         BUN slot = 0;
    4755     1976436 :         int reused = 0;
    4756             : 
    4757     1976436 :         if (!locked)
    4758     1858093 :                 lock_table(tr->store, t->base.id);
    4759             :         /* naive vacuum approach, iterator through segments, check for large enough deleted segments
    4760             :          * or create new segment at the end */
    4761     8358146 :         for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
    4762     6452538 :                 if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* re-use old deleted or rolledback append */
    4763             : 
    4764       70837 :                         if ((seg->end - seg->start) >= cnt) {
    4765             : 
    4766             :                                 /* if previous is claimed before we could simply adjust the end/start */
    4767       70837 :                                 if (p && p->ts == tr->tid && !p->deleted) {
    4768       56697 :                                         slot = p->end;
    4769       56697 :                                         p->end += cnt;
    4770       56697 :                                         seg->start += cnt;
    4771       56697 :                                         reused = 1;
    4772       56697 :                                         break;
    4773             :                                 }
    4774             :                                 /* we claimed part of the old segment, the split off part needs to stay deleted */
    4775       14140 :                                 if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
    4776             :                                         ok = LOG_ERR;
    4777             :                                         break;
    4778             :                                 }
    4779             :                         }
    4780       14140 :                         seg->ts = tr->tid;
    4781       14140 :                         seg->deleted = false;
    4782       14140 :                         slot = seg->start;
    4783       14140 :                         reused = 1;
    4784       14140 :                         break;
    4785             :                 }
    4786             :         }
    4787     1976445 :         if (ok == LOG_OK && !reused) {
    4788     1905609 :                 if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
    4789     1870978 :                         slot = s->segs->t->end;
    4790     1870978 :                         s->segs->t->end += cnt;
    4791             :                 } else {
    4792       34631 :                         if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
    4793             :                                 ok = LOG_ERR;
    4794             :                         } else {
    4795       34631 :                                 if (!s->segs->h)
    4796           0 :                                         s->segs->h = s->segs->t;
    4797       34631 :                                 slot = s->segs->t->start;
    4798             :                         }
    4799             :                 }
    4800             :         }
    4801     1976445 :         if (!locked)
    4802     1858102 :                 unlock_table(tr->store, t->base.id);
    4803             : 
    4804     1976445 :         if (ok == LOG_OK) {
    4805             :                 /* hard to only add this once per transaction (probably want to change to once per new segment) */
    4806     1976446 :                 if (!in_transaction) {
    4807       46904 :                         trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
    4808       46904 :                         in_transaction = true;
    4809             :                 }
    4810     1976446 :                 if (in_transaction && !NOT_TO_BE_LOGGED(t))
    4811     1976021 :                         tr->logchanges += (int) cnt;
    4812     1976446 :                 *offset = slot;
    4813             :         }
    4814             :         return ok;
    4815             : }
    4816             : 
    4817             : /*
    4818             :  * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
    4819             :  * of the global transaction and mark the newly added storage slots unused on the global
    4820             :  * level but used on the local transaction level. Besides this the local transaction needs
    4821             :  * to update (and mark unused) any slot inbetween the old end and new slots.
    4822             :  * */
    4823             : static int
    4824     1888042 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4825             : {
    4826     1888042 :         storage *s;
    4827             : 
    4828             :         /* we have a single segment structure for each persistent table
    4829             :          * for temporary tables each has its own */
    4830     1888042 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4831             :                 return LOG_ERR;
    4832             : 
    4833     1888170 :         return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
    4834             : }
    4835             : 
    4836             : /* some tables cannot be updated concurrently (user/roles etc) */
    4837             : static int
    4838      118348 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
    4839             : {
    4840      118348 :         storage *s;
    4841      118348 :         int res = 0;
    4842             : 
    4843             :         /* we have a single segment structure for each persistent table
    4844             :          * for temporary tables each has its own */
    4845      118348 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4846             :                 /* TODO check for other inserts ! */
    4847             :                 return LOG_ERR;
    4848             : 
    4849      118348 :         lock_table(tr->store, t->base.id);
    4850      118348 :         if ((res = segments_conflict(tr, s->segs, 1))) {
    4851           4 :                 unlock_table(tr->store, t->base.id);
    4852           4 :                 return LOG_CONFLICT;
    4853             :         }
    4854      118344 :         res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
    4855      118344 :         unlock_table(tr->store, t->base.id);
    4856      118344 :         return res;
    4857             : }
    4858             : 
    4859             : static int
    4860       13968 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
    4861             : {
    4862       13968 :         storage *s;
    4863       13968 :         int res = 0;
    4864             : 
    4865       13968 :         if ((s = bind_del_data(tr, t, NULL)) == NULL)
    4866             :                 return LOG_ERR;
    4867             : 
    4868       13968 :         lock_table(tr->store, t->base.id);
    4869       13968 :         res = segments_conflict(tr, s->segs, uncommitted);
    4870       13968 :         unlock_table(tr->store, t->base.id);
    4871       13968 :         return res ? LOG_CONFLICT : LOG_OK;
    4872             : }
    4873             : 
    4874             : static size_t
    4875     1408044 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
    4876             : {
    4877     1408044 :         size_t cnt = 0;
    4878             : 
    4879     1580955 :         for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
    4880             :                 ;
    4881             : 
    4882     3498816 :         for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
    4883     2090754 :                 if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
    4884      120807 :                         cnt += s->end - s->start;
    4885             :         }
    4886     1408062 :         return cnt;
    4887             : }
    4888             : 
    4889             : static BAT *
    4890     1408017 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
    4891             : {
    4892     1408017 :         lock_table(tr->store, t->base.id);
    4893     1408063 :         segment *s = S->segs->h;
    4894             :         /* step one no deletes -> dense range */
    4895     1408063 :         uint32_t cur = 0;
    4896     1408063 :         size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
    4897     1408069 :         if (!dnr) {
    4898     1301929 :                 unlock_table(tr->store, t->base.id);
    4899     1301924 :                 return BATdense(start, start, end-start);
    4900             :         }
    4901             : 
    4902      106140 :         BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
    4903      106140 :         if (!b) {
    4904           0 :                 unlock_table(tr->store, t->base.id);
    4905           0 :                 return NULL;
    4906             :         }
    4907             : 
    4908      106140 :         uint32_t *restrict dst = Tloc(b, 0);
    4909    59490587 :         for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
    4910    59442140 :                 if (s->end < start)
    4911       83316 :                         continue;
    4912    59358824 :                 if (s->start >= end)
    4913             :                         break;
    4914    59301131 :                 msk m = (SEG_IS_VALID(s, tr));
    4915    59301131 :                 size_t lnr = s->end-s->start;
    4916    59301131 :                 if (s->start < start)
    4917       10750 :                         lnr -= (start - s->start);
    4918    59301131 :                 if (s->end > end)
    4919        7816 :                         lnr -= s->end - end;
    4920             : 
    4921    59301131 :                 if (m) {
    4922      972854 :                         size_t used = pos&31, end = 32;
    4923      972854 :                         if (used) {
    4924      843770 :                                 if (lnr < (32-used))
    4925      489833 :                                         end = used + lnr;
    4926      843770 :                                 assert(end > used);
    4927      843770 :                                 cur |= ((1U << (end - used)) - 1) << used;
    4928      843770 :                                 lnr -= end - used;
    4929      843770 :                                 pos += end - used;
    4930      843770 :                                 if (end == 32) {
    4931      353937 :                                         *dst++ = cur;
    4932      353937 :                                         cur = 0;
    4933             :                                 }
    4934             :                         }
    4935      972854 :                         size_t full = lnr/32;
    4936      972854 :                         size_t rest = lnr%32;
    4937      972854 :                         if (full > 0) {
    4938      273427 :                                 memset(dst, ~0, full * sizeof(*dst));
    4939      273427 :                                 dst += full;
    4940      273427 :                                 lnr -= full * 32;
    4941      273427 :                                 pos += full * 32;
    4942             :                         }
    4943      972854 :                         if (rest > 0) {
    4944      455704 :                                 cur |= (1U << rest) - 1;
    4945      455704 :                                 lnr -= rest;
    4946      455704 :                                 pos += rest;
    4947             :                         }
    4948      972854 :                         assert(lnr==0);
    4949             :                 } else {
    4950    58328277 :                         size_t used = pos&31, end = 32;
    4951    58328277 :                         if (used) {
    4952    56496093 :                                 if (lnr < (32-used))
    4953    54578819 :                                         end = used + lnr;
    4954             : 
    4955    56496093 :                                 pos+= (end-used);
    4956    56496093 :                                 lnr-= (end-used);
    4957    56496093 :                                 if (end == 32) {
    4958     1917274 :                                         *dst++ = cur;
    4959     1917274 :                                         cur = 0;
    4960             :                                 }
    4961             :                         }
    4962    58328277 :                         size_t full = lnr/32;
    4963    58328277 :                         size_t rest = lnr%32;
    4964    58328277 :                         memset(dst, 0, full * sizeof(*dst));
    4965    58328277 :                         dst += full;
    4966    58328277 :                         lnr -= full * 32;
    4967    58328277 :                         pos += full * 32;
    4968    58328277 :                         pos+= rest;
    4969    58328277 :                         lnr-= rest;
    4970    58328277 :                         assert(lnr==0);
    4971             :                 }
    4972             :         }
    4973             : 
    4974      106140 :         unlock_table(tr->store, t->base.id);
    4975      106140 :         if (pos%32)
    4976      101405 :                 *dst=cur;
    4977      106140 :         BATsetcount(b, nr);
    4978      106140 :         bn = BATmaskedcands(start, nr, b, true);
    4979      106139 :         BBPreclaim(b);
    4980      106138 :         (void)pos;
    4981      106138 :         assert (pos == nr);
    4982             :         return bn;
    4983             : }
    4984             : 
    4985             : static void *                                   /* BAT * */
    4986     1413543 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
    4987             : {
    4988             :         /* with nr_of_parts - part_nr we can adjust parts */
    4989     1413543 :         storage *s = tab_timestamp_storage(tr, t);
    4990             : 
    4991     1413717 :         if (!s)
    4992             :                 return NULL;
    4993     1413717 :         size_t nr = segs_end(s->segs, tr, t);
    4994             : 
    4995     1414134 :         if (!nr)
    4996        6103 :                 return BATdense(0, 0, 0);
    4997             : 
    4998             :         /* compute proper part */
    4999     1408031 :         size_t part_size = nr/nr_of_parts;
    5000     1408031 :         size_t start = part_size * part_nr;
    5001     1408031 :         size_t end = start + part_size;
    5002     1408031 :         if (part_nr == (nr_of_parts-1))
    5003     1260751 :                 end = nr;
    5004     1408031 :         assert(end <= nr);
    5005     1408031 :         return segments2cands(s, tr, t, start, end);
    5006             : }
    5007             : 
    5008             : static int
    5009           1 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
    5010             : {
    5011           1 :         bool update_conflict = false;
    5012             : 
    5013           1 :         if (segments_in_transaction(tr, col->t))
    5014             :                 return LOG_CONFLICT;
    5015             : 
    5016           1 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5017             : 
    5018           1 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5019           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5020           1 :         assert(d && d->cs.ts == tr->tid);
    5021           1 :         if (odelta != d)
    5022           1 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
    5023           1 :         if (d->cs.bid)
    5024           1 :                 temp_destroy(d->cs.bid);
    5025           1 :         if (d->cs.uibid)
    5026           1 :                 temp_destroy(d->cs.uibid);
    5027           1 :         if (d->cs.uvbid)
    5028           1 :                 temp_destroy(d->cs.uvbid);
    5029           1 :         bat_set_access(bn, BAT_READ);
    5030           1 :         d->cs.bid = temp_create(bn);
    5031           1 :         d->cs.uibid = 0;
    5032           1 :         d->cs.uvbid = 0;
    5033           1 :         d->cs.ucnt = 0;
    5034           1 :         d->cs.cleared = true;
    5035           1 :         d->cs.ts = tr->tid;
    5036           1 :         ATOMIC_INIT(&d->cs.refcnt, 1);
    5037           1 :         return LOG_OK;
    5038             : }
    5039             : 
    5040             : static int
    5041          58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
    5042             : {
    5043          58 :         bool update_conflict = false;
    5044             : 
    5045          58 :         if (segments_in_transaction(tr, col->t))
    5046             :                 return LOG_CONFLICT;
    5047             : 
    5048          58 :         sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
    5049             : 
    5050          58 :         if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
    5051           0 :                 return update_conflict ? LOG_CONFLICT : LOG_ERR;
    5052          58 :         assert(d && d->cs.ts == tr->tid);
    5053          58 :         assert(col->t->persistence != SQL_DECLARED_TABLE);
    5054          58 :         if (odelta != d)
    5055          58 :                 trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
    5056             : 
    5057          58 :         d->cs.st = st;
    5058          58 :         d->cs.cleared = true;
    5059          58 :         if (d->cs.bid)
    5060          58 :                 temp_destroy(d->cs.bid);
    5061          58 :         o = transfer_to_systrans(o);
    5062          58 :         if (o == NULL)
    5063             :                 return LOG_ERR;
    5064          58 :         bat_set_access(o, BAT_READ);
    5065          58 :         d->cs.bid = temp_create(o);
    5066          58 :         if (u) {
    5067          53 :                 if (d->cs.ebid)
    5068           0 :                         temp_destroy(d->cs.ebid);
    5069          53 :                 u = transfer_to_systrans(u);
    5070          53 :                 if (u == NULL)
    5071             :                         return LOG_ERR;
    5072          53 :                 d->cs.ebid = temp_create(u);
    5073             :         }
    5074             :         return LOG_OK;
    5075             : }
    5076             : 
    5077             : void
    5078         341 : bat_storage_init( store_functions *sf)
    5079             : {
    5080         341 :         sf->bind_col = &bind_col;
    5081         341 :         sf->bind_updates = &bind_updates;
    5082         341 :         sf->bind_updates_idx = &bind_updates_idx;
    5083         341 :         sf->bind_idx = &bind_idx;
    5084         341 :         sf->bind_cands = &bind_cands;
    5085             : 
    5086         341 :         sf->claim_tab = &claim_tab;
    5087         341 :         sf->key_claim_tab = &key_claim_tab;
    5088         341 :         sf->tab_validate = &tab_validate;
    5089             : 
    5090         341 :         sf->append_col = &append_col;
    5091         341 :         sf->append_idx = &append_idx;
    5092             : 
    5093         341 :         sf->update_col = &update_col;
    5094         341 :         sf->update_idx = &update_idx;
    5095             : 
    5096         341 :         sf->delete_tab = &delete_tab;
    5097             : 
    5098         341 :         sf->count_del = &count_del;
    5099         341 :         sf->count_col = &count_col;
    5100         341 :         sf->count_idx = &count_idx;
    5101         341 :         sf->dcount_col = &dcount_col;
    5102         341 :         sf->min_max_col = &min_max_col;
    5103         341 :         sf->set_stats_col = &set_stats_col;
    5104         341 :         sf->sorted_col = &sorted_col;
    5105         341 :         sf->unique_col = &unique_col;
    5106         341 :         sf->double_elim_col = &double_elim_col;
    5107         341 :         sf->col_stats = &col_stats;
    5108         341 :         sf->col_set_range = &col_set_range;
    5109         341 :         sf->col_not_null = &col_not_null;
    5110             : 
    5111         341 :         sf->col_dup = &col_dup;
    5112         341 :         sf->idx_dup = &idx_dup;
    5113         341 :         sf->del_dup = &del_dup;
    5114             : 
    5115         341 :         sf->create_col = &create_col;    /* create and add to change list */
    5116         341 :         sf->create_idx = &create_idx;
    5117         341 :         sf->create_del = &create_del;
    5118             : 
    5119         341 :         sf->destroy_col = &destroy_col;  /* free resources */
    5120         341 :         sf->destroy_idx = &destroy_idx;
    5121         341 :         sf->destroy_del = &destroy_del;
    5122             : 
    5123         341 :         sf->drop_col = &drop_col;                /* add drop to change list */
    5124         341 :         sf->drop_idx = &drop_idx;
    5125         341 :         sf->drop_del = &drop_del;
    5126             : 
    5127         341 :         sf->clear_table = &clear_table;
    5128             : 
    5129         341 :         sf->swap_bats = &swap_bats;
    5130         341 :         sf->col_compress = &col_compress;
    5131         341 : }
    5132             : 
    5133             : #if 0
    5134             : static lng
    5135             : log_get_nr_inserted(sql_column *fc, lng *offset)
    5136             : {
    5137             :         lng cnt = 0;
    5138             : 
    5139             :         if (!fc || GDKinmemory(0))
    5140             :                 return 0;
    5141             : 
    5142             :         if (fc->base.atime && fc->base.allocated) {
    5143             :                 sql_delta *fb = fc->data;
    5144             :                 BAT *ins = temp_descriptor(fb->cs.bid);
    5145             : 
    5146             :                 if (ins && BATcount(ins) > 0 && BATcount(ins) > ins->batInserted) {
    5147             :                         cnt = BATcount(ins) - ins->batInserted;
    5148             :                 }
    5149             :                 bat_destroy(ins);
    5150             :         }
    5151             :         return cnt;
    5152             : }
    5153             : 
    5154             : static lng
    5155             : log_get_nr_deleted(sql_table *ft, lng *offset)
    5156             : {
    5157             :         lng cnt = 0;
    5158             : 
    5159             :         if (!ft || GDKinmemory(0))
    5160             :                 return 0;
    5161             : 
    5162             :         if (ft->base.atime && ft->base.allocated) {
    5163             :                 storage *fdb = ft->data;
    5164             :                 BAT *db = temp_descriptor(fdb->cs.bid);
    5165             : 
    5166             :                 if (db && BATcount(db) > 0 && BATcount(db) > db->batInserted) {
    5167             :                         cnt = BATcount(db) - db->batInserted;
    5168             :                         *offset = db->batInserted;
    5169             :                 }
    5170             :                 bat_destroy(db);
    5171             :         }
    5172             :         return cnt;
    5173             : }
    5174             : #endif

Generated by: LCOV version 1.14