Line data Source code
1 : /*
2 : * SPDX-License-Identifier: MPL-2.0
3 : *
4 : * This Source Code Form is subject to the terms of the Mozilla Public
5 : * License, v. 2.0. If a copy of the MPL was not distributed with this
6 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 : *
8 : * Copyright 2024 MonetDB Foundation;
9 : * Copyright August 2008 - 2023 MonetDB B.V.;
10 : * Copyright 1997 - July 2008 CWI.
11 : */
12 :
13 : #include "monetdb_config.h"
14 : #include "bat_storage.h"
15 : #include "bat_utils.h"
16 : #include "sql_string.h"
17 : #include "gdk_atoms.h"
18 : #include "gdk_atoms.h"
19 : #include "matomic.h"
20 :
21 : #define FATAL_MERGE_FAILURE "Out Of Memory during critical merge operation: %s"
22 : #define NOT_TO_BE_LOGGED(t) (isUnloggedTable(t) || isTempTable(t))
23 :
24 : static int log_update_col( sql_trans *tr, sql_change *c);
25 : static int log_update_idx( sql_trans *tr, sql_change *c);
26 : static int log_update_del( sql_trans *tr, sql_change *c);
27 : static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
28 : static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
29 : static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
30 : static int log_create_col(sql_trans *tr, sql_change *change);
31 : static int log_create_idx(sql_trans *tr, sql_change *change);
32 : static int log_create_del(sql_trans *tr, sql_change *change);
33 : static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
34 : static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
35 : static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
36 : static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
37 : static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
38 : static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
39 : static int tc_gc_upd_col( sql_store Store, sql_change *c, ulng oldest);
40 : static int tc_gc_upd_idx( sql_store Store, sql_change *c, ulng oldest);
41 :
42 : static void merge_delta( sql_delta *obat);
43 :
44 : /* valid
45 : * !deleted && VALID_4_READ(TS, tr) existing or newly created segment
46 : * deleted && TS > tr->ts && OLDTS < tr->ts deleted after current transaction
47 : */
48 :
49 : #define VALID_4_READ(TS,tr) \
50 : (TS == tr->tid || (tr->parent && tr_version_of_parent(tr, TS)) || TS < tr->ts)
51 :
52 : /* when changed, check if the old status is still valid */
53 : #define OLD_VALID_4_READ(TS,OLDTS,tr) \
54 : (OLDTS && TS != tr->tid && TS > tr->ts && OLDTS < tr->ts)
55 :
56 : #define SEG_VALID_4_DELETE(seg,tr) \
57 : (!seg->deleted && VALID_4_READ(seg->ts, tr))
58 :
59 : /* Delete (in current trans or by some other finised transaction, or re-used segment which used to be deleted */
60 : #define SEG_IS_DELETED(seg,tr) \
61 : ((seg->deleted && (VALID_4_READ(seg->ts, tr) || !OLD_VALID_4_READ(seg->ts, seg->oldts, tr))) || \
62 : (!seg->deleted && !VALID_4_READ(seg->ts, tr)))
63 :
64 : /* A segment is part of the current transaction is someway or is deleted by some other transaction but use to be valid */
65 : #define SEG_IS_VALID(seg, tr) \
66 : ((!seg->deleted && VALID_4_READ(seg->ts, tr)) || \
67 : (seg->deleted && OLD_VALID_4_READ(seg->ts, seg->oldts, tr)))
68 :
69 : static inline BAT *
70 5251 : transfer_to_systrans(BAT *b)
71 : {
72 : /* transfer a BAT from the TRANSIENT farm to the SYSTRANS farm */
73 5251 : MT_lock_set(&b->theaplock);
74 5251 : if (VIEWtparent(b) || VIEWvtparent(b)) {
75 19 : MT_lock_unset(&b->theaplock);
76 19 : BAT *bn = COLcopy(b, b->ttype, true, SYSTRANS);
77 19 : BBPreclaim(b);
78 19 : return bn;
79 : }
80 5232 : if (b->theap->farmid == TRANSIENT ||
81 14 : (b->tvheap && b->tvheap->farmid == TRANSIENT)) {
82 4839 : QryCtx *qc = MT_thread_get_qry_ctx();
83 4839 : if (qc) {
84 2463 : if (b->theap->farmid == TRANSIENT && b->theap->parentid == b->batCacheid) {
85 2463 : ATOMIC_SUB(&qc->datasize, b->theap->size);
86 2463 : b->theap->farmid = SYSTRANS;
87 2463 : b->batRole = SYSTRANS;
88 : }
89 2463 : if (b->tvheap && b->tvheap->farmid == TRANSIENT && b->tvheap->parentid == b->batCacheid) {
90 1068 : ATOMIC_SUB(&qc->datasize, b->tvheap->size);
91 1068 : b->tvheap->farmid = SYSTRANS;
92 : }
93 : }
94 : }
95 5232 : MT_lock_unset(&b->theaplock);
96 5232 : return b;
97 : }
98 :
99 : static void
100 25064188 : lock_table(sqlstore *store, sqlid id)
101 : {
102 25064188 : MT_lock_set(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
103 25067545 : }
104 :
105 : static void
106 25067388 : unlock_table(sqlstore *store, sqlid id)
107 : {
108 25067388 : MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
109 25067507 : }
110 :
111 : static void
112 20069435 : lock_column(sqlstore *store, sqlid id)
113 : {
114 20069435 : MT_lock_set(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
115 20068904 : }
116 :
117 : static void
118 20069330 : unlock_column(sqlstore *store, sqlid id)
119 : {
120 20069330 : MT_lock_unset(&store->column_locks[id&(NR_COLUMN_LOCKS-1)]);
121 20068936 : }
122 :
123 : static void
124 112982 : trans_add_obj(sql_trans *tr, sql_base *b, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
125 : {
126 112982 : assert(cleanup);
127 112982 : trans_add(tr, dup_base(b), data, cleanup, commit, log);
128 112982 : }
129 :
130 : static void
131 132948 : trans_add_table(sql_trans *tr, sql_base *b, sql_table *t, void *data, tc_cleanup_fptr cleanup, tc_commit_fptr commit, tc_log_fptr log)
132 : {
133 132948 : assert(cleanup);
134 132948 : dup_base(&t->base);
135 132947 : trans_add(tr, b, data, cleanup, commit, log);
136 132940 : }
137 :
138 : static int
139 79827 : tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
140 : {
141 79827 : segment *s = change->data;
142 :
143 79827 : if (s->ts <= oldest) {
144 33141 : while(s) {
145 21076 : segment *n = s->prev;
146 21076 : ATOMIC_PTR_DESTROY(&s->next);
147 21076 : _DELETE(s);
148 21076 : s = n;
149 : }
150 12065 : sqlstore *store = Store;
151 12065 : table_destroy(store, (sql_table*)change->obj);
152 12065 : return 1;
153 : }
154 : return LOG_OK;
155 : }
156 :
157 : static void
158 21076 : mark4destroy(segment *s, sql_change *c, ulng commit_ts)
159 : {
160 : /* we can only be accessed by anything older then commit_ts */
161 21076 : if (c->cleanup == &tc_gc_seg)
162 9011 : s->prev = c->data;
163 : else
164 12065 : c->cleanup = &tc_gc_seg;
165 21076 : c->data = s;
166 21076 : s->ts = commit_ts;
167 16676 : }
168 :
169 : static segment *
170 87424 : new_segment(segment *o, sql_trans *tr, size_t cnt)
171 : {
172 87424 : segment *n = (segment*)GDKmalloc(sizeof(segment));
173 :
174 87424 : assert(tr);
175 87424 : if (n) {
176 87424 : *n = (segment) {
177 87424 : .ts = tr->tid,
178 : .oldts = 0,
179 : .deleted = false,
180 : .start = 0,
181 : .end = cnt,
182 : .next = ATOMIC_PTR_VAR_INIT(NULL),
183 : .prev = NULL,
184 : };
185 87424 : if (o) {
186 36064 : n->start += o->end;
187 36064 : n->end += o->end;
188 36064 : ATOMIC_PTR_SET(&o->next, n);
189 : }
190 : }
191 87424 : return n;
192 : }
193 :
194 : static segment *
195 90265 : split_segment(segments *segs, segment *o, segment *p, sql_trans *tr, size_t start, size_t cnt, bool deleted)
196 : {
197 90265 : assert(tr);
198 90265 : if (o->start == start && o->end == start+cnt) {
199 10558 : assert(o->deleted != deleted || o->ts < TRANSACTION_ID_BASE);
200 10558 : o->oldts = o->ts;
201 10558 : o->ts = tr->tid;
202 10558 : o->deleted = deleted;
203 10558 : return o;
204 : }
205 79707 : segment *n = (segment*)GDKmalloc(sizeof(segment));
206 :
207 79707 : if (!n)
208 : return NULL;
209 79707 : n->prev = NULL;
210 :
211 79707 : if (o->ts == tr->tid) {
212 4020 : n->oldts = 0;
213 4020 : n->ts = 1;
214 4020 : n->deleted = true;
215 : } else {
216 75687 : n->oldts = o->ts;
217 75687 : n->ts = tr->tid;
218 75687 : n->deleted = deleted;
219 : }
220 79707 : if (start == o->start) {
221 : /* 2-way split: o remains latter part of segment, new one is
222 : * inserted before */
223 64742 : n->start = o->start;
224 64742 : n->end = n->start + cnt;
225 64742 : ATOMIC_PTR_INIT(&n->next, o);
226 64742 : if (segs->h == o)
227 461 : segs->h = n;
228 64742 : if (p)
229 64281 : ATOMIC_PTR_SET(&p->next, n);
230 64742 : o->start = n->end;
231 14965 : } else if (start+cnt == o->end) {
232 : /* 2-way split: o remains first part of segment, new one is
233 : * added after */
234 5462 : n->start = o->end - cnt;
235 5462 : n->end = o->end;
236 5462 : ATOMIC_PTR_INIT(&n->next, ATOMIC_PTR_GET(&o->next));
237 5462 : ATOMIC_PTR_SET(&o->next, n);
238 5462 : if (segs->t == o)
239 812 : segs->t = n;
240 5462 : o->end = n->start;
241 : } else {
242 : /* 3-way split: o remains first part of segment, two new ones
243 : * are added after */
244 9503 : segment *n2 = GDKmalloc(sizeof(segment));
245 9503 : if (n2 == NULL) {
246 0 : GDKfree(n);
247 0 : return NULL;
248 : }
249 9503 : ATOMIC_PTR_INIT(&n->next, n2);
250 9503 : n->start = start;
251 9503 : n->end = start + cnt;
252 9503 : *n2 = *o;
253 9503 : ATOMIC_PTR_INIT(&n2->next, ATOMIC_PTR_GET(&o->next));
254 9503 : n2->start = n->end;
255 9503 : n2->prev = NULL;
256 9503 : if (segs->t == o)
257 3495 : segs->t = n2;
258 9503 : ATOMIC_PTR_SET(&o->next, n);
259 9503 : o->end = start;
260 : }
261 : return n;
262 : }
263 :
264 : static void
265 4188 : rollback_segments(segments *segs, sql_trans *tr, sql_change *change, ulng oldest)
266 : {
267 4188 : segment *cur = segs->h, *seg = NULL;
268 18144 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
269 13956 : if (cur->ts == tr->tid) { /* revert */
270 4685 : cur->deleted = !cur->deleted || (cur->ts == cur->oldts);
271 4685 : cur->ts = cur->oldts==tr->tid?0:cur->oldts; /* need old ts */
272 4685 : cur->oldts = 0;
273 : }
274 13956 : if (cur->ts <= oldest) { /* possibly merge range */
275 13070 : if (!seg) { /* skip first */
276 : seg = cur;
277 8882 : } else if (seg->end == cur->start && seg->deleted == cur->deleted) {
278 : /* merge with previous */
279 4400 : seg->end = cur->end;
280 4400 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
281 4400 : if (cur == segs->t)
282 2849 : segs->t = seg;
283 4400 : mark4destroy(cur, change, store_get_timestamp(tr->store));
284 4400 : cur = seg;
285 : } else {
286 : seg = cur; /* begin of new merge */
287 : }
288 : }
289 : }
290 4188 : }
291 :
292 : static size_t
293 102784 : segs_end_include_deleted( segments *segs, sql_trans *tr)
294 : {
295 102784 : size_t cnt = 0;
296 102784 : segment *s = segs->h, *l = NULL;
297 :
298 509545 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
299 406761 : if (s->ts == tr->tid || SEG_IS_VALID(s, tr))
300 : l = s;
301 : }
302 102784 : if (l)
303 102777 : cnt = l->end;
304 102784 : return cnt;
305 : }
306 :
307 : static int
308 102784 : segments2cs(sql_trans *tr, segments *segs, column_storage *cs)
309 : {
310 : /* set bits correctly */
311 102784 : BAT *b = temp_descriptor(cs->bid);
312 :
313 102784 : if (!b)
314 : return LOG_ERR;
315 102784 : segment *s = segs->h;
316 :
317 102784 : size_t nr = segs_end_include_deleted(segs, tr);
318 102784 : size_t rounded_nr = ((nr+31)&~31);
319 102784 : if (rounded_nr > BATcapacity(b) && BATextend(b, rounded_nr) != GDK_SUCCEED) {
320 0 : bat_destroy(b);
321 0 : return LOG_ERR;
322 : }
323 :
324 : /* disable all properties here */
325 102784 : MT_lock_set(&b->theaplock);
326 102784 : b->tsorted = false;
327 102784 : b->trevsorted = false;
328 102784 : b->tnosorted = 0;
329 102784 : b->tnorevsorted = 0;
330 102784 : b->tseqbase = oid_nil;
331 102784 : b->tkey = false;
332 102784 : b->tnokey[0] = 0;
333 102784 : b->tnokey[1] = 0;
334 102784 : b->theap->dirty = true;
335 102784 : BUN cnt = BATcount(b);
336 102784 : MT_lock_unset(&b->theaplock);
337 :
338 102784 : uint32_t *restrict dst;
339 : /* why hashlock ?? */
340 102784 : MT_rwlock_wrlock(&b->thashlock);
341 541429 : for (; s ; s=ATOMIC_PTR_GET(&s->next)) {
342 380513 : if (s->start >= nr)
343 : break;
344 335861 : if (s->ts == tr->tid && s->end != s->start) {
345 147469 : if (cnt < s->start) { /* first mark as deleted ! */
346 3829 : size_t lnr = s->start-cnt;
347 3829 : size_t pos = cnt;
348 3829 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
349 3829 : uint32_t cur = 0;
350 :
351 3829 : size_t used = pos&31, end = 32;
352 3829 : if (used) {
353 3702 : if (lnr < (32-used))
354 3504 : end = used + lnr;
355 3702 : assert(end > used);
356 3702 : cur |= ((1U << (end - used)) - 1) << used;
357 3702 : lnr -= end - used;
358 3702 : *dst++ |= cur;
359 3702 : cur = 0;
360 : }
361 3829 : size_t full = lnr/32;
362 3829 : size_t rest = lnr%32;
363 3829 : if (full > 0) {
364 8 : memset(dst, ~0, full * sizeof(*dst));
365 8 : dst += full;
366 8 : lnr -= full * 32;
367 : }
368 3829 : if (rest > 0) {
369 210 : cur |= (1U << rest) - 1;
370 210 : lnr -= rest;
371 210 : *dst |= cur;
372 : }
373 3829 : assert(lnr==0);
374 : }
375 147469 : size_t lnr = s->end-s->start;
376 147469 : size_t pos = s->start;
377 147469 : dst = (uint32_t *) Tloc(b, 0) + (pos/32);
378 147469 : uint32_t cur = 0;
379 147469 : size_t used = pos&31, end = 32;
380 147469 : if (used) {
381 109525 : if (lnr < (32-used))
382 103469 : end = used + lnr;
383 109525 : assert(end > used);
384 109525 : cur |= ((1U << (end - used)) - 1) << used;
385 109525 : lnr -= end - used;
386 109525 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
387 109525 : dst++;
388 109525 : cur = 0;
389 : }
390 147469 : size_t full = lnr/32;
391 147469 : size_t rest = lnr%32;
392 147469 : if (full > 0) {
393 3588 : memset(dst, s->deleted?~0:0, full * sizeof(*dst));
394 3588 : dst += full;
395 3588 : lnr -= full * 32;
396 : }
397 147469 : if (rest > 0) {
398 40924 : cur |= (1U << rest) - 1;
399 40924 : lnr -= rest;
400 40924 : *dst = s->deleted ? *dst | cur : *dst & ~cur;
401 : }
402 147469 : assert(lnr==0);
403 147469 : if (cnt < s->end)
404 335861 : cnt = s->end;
405 : }
406 : }
407 102784 : MT_rwlock_wrunlock(&b->thashlock);
408 102784 : if (nr > BATcount(b)) {
409 60674 : MT_lock_set(&b->theaplock);
410 60674 : BATsetcount(b, nr);
411 60674 : MT_lock_unset(&b->theaplock);
412 : }
413 :
414 102784 : bat_destroy(b);
415 102784 : return LOG_OK;
416 : }
417 :
418 : /* TODO return LOG_OK/ERR */
419 : static void
420 102810 : merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
421 : {
422 102810 : sqlstore* store = tr->store;
423 102810 : segment *cur = s->segs->h, *seg = NULL;
424 509643 : for (; cur; cur = ATOMIC_PTR_GET(&cur->next)) {
425 406833 : if (cur->ts == tr->tid) {
426 161642 : if (!cur->deleted)
427 91561 : cur->oldts = 0;
428 161642 : cur->ts = commit_ts;
429 : }
430 406833 : if (!seg) {
431 : /* first segment */
432 : seg = cur;
433 : }
434 304023 : else if (seg->ts < TRANSACTION_ID_BASE) {
435 : /* possible merge since both deleted flags are equal */
436 274621 : if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
437 204708 : int merge = 1;
438 204708 : node *n = store->active->h;
439 657610 : for (int i = 0; i < store->active->cnt; i++, n = n->next) {
440 558063 : sql_trans* other = ((sql_trans*)n->data);
441 558063 : ulng active = other->ts;
442 558063 : if(other->active == 2)
443 23077 : continue; /* pretend that another recently committed transaction is no longer active */
444 534986 : if (active == tr->ts)
445 137423 : continue; /* pretend that committing transaction has already committed and is no longer active */
446 397563 : if (seg->ts < active && cur->ts < active)
447 : break;
448 384590 : if (seg->ts > active && cur->ts > active)
449 292402 : continue;
450 :
451 92188 : assert((active > seg->ts && active < cur->ts) || (active < seg->ts && active > cur->ts));
452 : /* cannot safely merge since there is an active transaction between the segments */
453 : merge = false;
454 : break;
455 : }
456 : /* merge segments */
457 225040 : if (merge) {
458 112520 : seg->end = cur->end;
459 112520 : ATOMIC_PTR_SET(&seg->next, ATOMIC_PTR_GET(&cur->next));
460 112520 : if (cur == s->segs->t)
461 26533 : s->segs->t = seg;
462 112520 : if (commit_ts == oldest) {
463 95844 : ATOMIC_PTR_DESTROY(&cur->next);
464 95844 : _DELETE(cur);
465 : } else
466 33352 : mark4destroy(cur, change, commit_ts);
467 112520 : cur = seg;
468 112520 : continue;
469 : }
470 : }
471 : }
472 : seg = cur;
473 : }
474 102810 : }
475 :
476 : static int
477 2167121 : segments_in_transaction(sql_trans *tr, sql_table *t)
478 : {
479 2167121 : storage *s = ATOMIC_PTR_GET(&t->data);
480 2167121 : segment *seg = s->segs->h;
481 :
482 2167121 : if (seg && s->segs->t->ts == tr->tid)
483 : return 1;
484 683700 : for (; seg ; seg=ATOMIC_PTR_GET(&seg->next)) {
485 579483 : if (seg->ts == tr->tid)
486 : return 1;
487 : }
488 : return 0;
489 : }
490 :
491 : static size_t
492 17294105 : segs_end( segments *segs, sql_trans *tr, sql_table *table)
493 : {
494 17294105 : size_t cnt = 0;
495 :
496 17294105 : lock_table(tr->store, table->base.id);
497 17297103 : segment *s = segs->h, *l = NULL;
498 :
499 17297103 : if (segs->t && SEG_IS_VALID(segs->t, tr))
500 15079108 : l = s = segs->t;
501 :
502 211025937 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
503 193728976 : if (SEG_IS_VALID(s, tr))
504 : l = s;
505 : }
506 17296961 : if (l)
507 17278663 : cnt = l->end;
508 17296961 : unlock_table(tr->store, table->base.id);
509 17297031 : return cnt;
510 : }
511 :
512 : static segments *
513 51360 : new_segments(sql_trans *tr, size_t cnt)
514 : {
515 51360 : segments *n = (segments*)GDKmalloc(sizeof(segments));
516 :
517 51360 : if (n) {
518 51360 : n->h = n->t = new_segment(NULL, tr, cnt);
519 51360 : if (!n->h) {
520 0 : GDKfree(n);
521 0 : return NULL;
522 : }
523 51360 : sql_ref_init(&n->r);
524 : }
525 : return n;
526 : }
527 :
528 : static sql_delta *
529 30331033 : timestamp_delta( sql_trans *tr, sql_delta *d)
530 : {
531 30378713 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
532 47680 : d = d->next;
533 30331141 : return d;
534 : }
535 :
536 : static sql_delta *
537 30176802 : col_timestamp_delta( sql_trans *tr, sql_column *c)
538 : {
539 30176802 : return timestamp_delta( tr, ATOMIC_PTR_GET(&c->data));
540 : }
541 :
542 : static sql_delta *
543 21614 : idx_timestamp_delta( sql_trans *tr, sql_idx *i)
544 : {
545 21614 : return timestamp_delta( tr, ATOMIC_PTR_GET(&i->data));
546 : }
547 :
548 : static storage *
549 18302681 : timestamp_storage( sql_trans *tr, storage *d)
550 : {
551 18302681 : if (!d)
552 : return NULL;
553 18366589 : while (d->next && !VALID_4_READ(d->cs.ts, tr))
554 63908 : d = d->next;
555 : return d;
556 : }
557 :
558 : static storage *
559 18278005 : tab_timestamp_storage( sql_trans *tr, sql_table *t)
560 : {
561 18278005 : return timestamp_storage( tr, ATOMIC_PTR_GET(&t->data));
562 : }
563 :
564 : static sql_delta*
565 19347 : delta_dup(sql_delta *d)
566 : {
567 19347 : ATOMIC_INC(&d->cs.refcnt);
568 19347 : return d;
569 : }
570 :
571 : static void *
572 17953 : col_dup(sql_column *c)
573 : {
574 17953 : return delta_dup(ATOMIC_PTR_GET(&c->data));
575 : }
576 :
577 : static void *
578 2681 : idx_dup(sql_idx *i)
579 : {
580 2681 : if (!ATOMIC_PTR_GET(&i->data))
581 : return NULL;
582 1394 : return delta_dup(ATOMIC_PTR_GET(&i->data));
583 : }
584 :
585 : static storage*
586 1531 : storage_dup(storage *d)
587 : {
588 1531 : ATOMIC_INC(&d->cs.refcnt);
589 1531 : return d;
590 : }
591 :
592 : static void *
593 1531 : del_dup(sql_table *t)
594 : {
595 1531 : return storage_dup(ATOMIC_PTR_GET(&t->data));
596 : }
597 :
598 : static size_t
599 17 : count_inserts( segment *s, sql_trans *tr)
600 : {
601 17 : size_t cnt = 0;
602 :
603 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
604 55 : if (!s->deleted && s->ts == tr->tid)
605 4 : cnt += s->end - s->start;
606 : }
607 17 : return cnt;
608 : }
609 :
610 : static size_t
611 852152 : count_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
612 : {
613 852152 : size_t cnt = 0;
614 :
615 997409 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
616 : ;
617 :
618 4949818 : for(;s && s->start < end; s = ATOMIC_PTR_GET(&s->next)) {
619 4097665 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
620 1325531 : cnt += s->end - s->start;
621 : }
622 852153 : return cnt;
623 : }
624 :
625 : static size_t
626 17 : count_deletes( segment *s, sql_trans *tr)
627 : {
628 17 : size_t cnt = 0;
629 :
630 72 : for(;s; s = ATOMIC_PTR_GET(&s->next)) {
631 55 : if (SEG_IS_DELETED(s, tr))
632 17 : cnt += s->end - s->start;
633 : }
634 17 : return cnt;
635 : }
636 :
637 : #define CNT_ACTIVE 10
638 :
639 : static size_t
640 16782724 : count_col(sql_trans *tr, sql_column *c, int access)
641 : {
642 16782724 : storage *d;
643 16782724 : sql_delta *ds;
644 :
645 16782724 : if (!isTable(c->t))
646 : return 0;
647 16782724 : d = tab_timestamp_storage(tr, c->t);
648 16780636 : ds = col_timestamp_delta(tr, c);
649 16780854 : if (!d ||!ds)
650 : return 0;
651 16780854 : if (access == 2)
652 458324 : return ds?ds->cs.ucnt:0;
653 16322530 : if (access == 1)
654 17 : return count_inserts(d->segs->h, tr);
655 16322513 : if (access == QUICK)
656 533328 : return d->segs->t?d->segs->t->end:0;
657 15789185 : if (access == CNT_ACTIVE) {
658 852121 : size_t cnt = segs_end(d->segs, tr, c->t);
659 852156 : lock_table(tr->store, c->t->base.id);
660 852151 : cnt -= count_deletes_in_range(d->segs->h, tr, 0, cnt);
661 852152 : unlock_table(tr->store, c->t->base.id);
662 852152 : return cnt;
663 : }
664 14937064 : return segs_end(d->segs, tr, c->t);
665 : }
666 :
667 : static size_t
668 18831 : count_idx(sql_trans *tr, sql_idx *i, int access)
669 : {
670 18831 : storage *d;
671 18831 : sql_delta *ds;
672 :
673 18831 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
674 4631 : return 0;
675 14200 : d = tab_timestamp_storage(tr, i->t);
676 14198 : ds = idx_timestamp_delta(tr, i);
677 14199 : if (!d || !ds)
678 : return 0;
679 14199 : if (access == 2)
680 2855 : return ds?ds->cs.ucnt:0;
681 11344 : if (access == 1)
682 0 : return count_inserts(d->segs->h, tr);
683 11344 : if (access == QUICK)
684 3528 : return d->segs->t?d->segs->t->end:0;
685 7816 : return segs_end(d->segs, tr, i->t);
686 : }
687 :
688 : #define BATtdense2(b) (b->ttype == TYPE_void && b->tseqbase != oid_nil)
689 : static BAT *
690 13346029 : cs_bind_ubat( column_storage *cs, int access, int type, size_t cnt /* ie max position < cnt */)
691 : {
692 13346029 : BAT *b;
693 :
694 13346029 : assert(access == RD_UPD_ID || access == RD_UPD_VAL);
695 : /* returns the updates for cs */
696 13346029 : if (cs->uibid && cs->uvbid && cs->ucnt) {
697 7670 : if (access == RD_UPD_ID) {
698 4942 : if (!(b = temp_descriptor(cs->uibid)))
699 : return NULL;
700 4942 : if (!b->tsorted || ((BATtdense2(b) && (b->tseqbase + BATcount(b)) >= cnt) ||
701 912 : (!BATtdense2(b) && BATcount(b) && ((oid*)b->theap->base)[BATcount(b)-1] >= cnt))) {
702 4030 : oid nil = oid_nil;
703 : /* less then cnt */
704 4030 : BAT *s = BATselect(b, NULL, &nil, &cnt, false, false, false);
705 4030 : if (!s) {
706 0 : bat_destroy(b);
707 0 : return NULL;
708 : }
709 :
710 4030 : BAT *nb = BATproject(s, b);
711 4030 : bat_destroy(s);
712 4030 : bat_destroy(b);
713 4030 : b = nb;
714 : }
715 : } else {
716 2728 : b = temp_descriptor(cs->uvbid);
717 : }
718 : } else {
719 22230553 : b = e_BAT(access == RD_UPD_ID?TYPE_oid:type);
720 : }
721 : return b;
722 : }
723 :
724 : static BAT *
725 0 : merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
726 : {
727 0 : int err = 0;
728 0 : BAT *uv = *UV;
729 0 : BUN cnt = BATcount(ui)+BATcount(oi);
730 0 : BATiter uvi;
731 0 : BATiter ovi;
732 :
733 0 : if (uv) {
734 0 : uvi = bat_iterator(uv);
735 0 : ovi = bat_iterator(ov);
736 : }
737 :
738 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
739 0 : BUN uip = 0, uie = BATcount(ui);
740 0 : BUN oip = 0, oie = BATcount(oi);
741 :
742 0 : oid uiseqb = ui->tseqbase;
743 0 : oid oiseqb = oi->tseqbase;
744 0 : oid *uipt = NULL, *oipt = NULL;
745 0 : BATiter uii = bat_iterator(ui);
746 0 : BATiter oii = bat_iterator(oi);
747 0 : if (!BATtdensebi(&uii))
748 0 : uipt = uii.base;
749 0 : if (!BATtdensebi(&oii))
750 0 : oipt = oii.base;
751 :
752 0 : if (uiseqb == oiseqb && uie == oie) { /* full overlap, no values */
753 0 : if (uv) {
754 0 : bat_iterator_end(&uvi);
755 0 : bat_iterator_end(&ovi);
756 : }
757 0 : bat_iterator_end(&uii);
758 0 : bat_iterator_end(&oii);
759 0 : if (uv) {
760 0 : *UV = uv;
761 : } else {
762 0 : bat_destroy(uv);
763 : }
764 0 : bat_destroy(oi);
765 0 : bat_destroy(ov);
766 0 : return ui;
767 : }
768 0 : BAT *ni = bat_new(TYPE_oid, cnt, SYSTRANS);
769 0 : BAT *nv = uv?bat_new(uv->ttype, cnt, SYSTRANS):NULL;
770 :
771 0 : if (!ni || (uv && !nv)) {
772 0 : bat_destroy(ni);
773 0 : bat_destroy(nv);
774 0 : bat_destroy(ui);
775 0 : bat_destroy(uv);
776 0 : bat_destroy(oi);
777 0 : bat_destroy(ov);
778 0 : return NULL;
779 : }
780 0 : while (uip < uie && oip < oie && !err) {
781 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
782 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
783 :
784 0 : if (uiid <= oiid) {
785 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
786 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
787 : err = 1;
788 0 : uip++;
789 0 : if (uiid == oiid)
790 0 : oip++;
791 : } else { /* uiid > oiid */
792 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
793 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
794 : err = 1;
795 0 : oip++;
796 : }
797 : }
798 0 : while (uip < uie && !err) {
799 0 : oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
800 0 : if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
801 0 : (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) != GDK_SUCCEED))
802 : err = 1;
803 0 : uip++;
804 : }
805 0 : while (oip < oie && !err) {
806 0 : oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
807 0 : if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
808 0 : (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) != GDK_SUCCEED) )
809 : err = 1;
810 0 : oip++;
811 : }
812 0 : if (uv) {
813 0 : bat_iterator_end(&uvi);
814 0 : bat_iterator_end(&ovi);
815 : }
816 0 : bat_iterator_end(&uii);
817 0 : bat_iterator_end(&oii);
818 0 : bat_destroy(ui);
819 0 : bat_destroy(uv);
820 0 : bat_destroy(oi);
821 0 : bat_destroy(ov);
822 0 : if (!err) {
823 0 : if (nv)
824 0 : *UV = nv;
825 0 : return ni;
826 : }
827 0 : *UV = NULL;
828 0 : bat_destroy(ni);
829 0 : bat_destroy(nv);
830 0 : return NULL;
831 : }
832 :
833 : static sql_delta *
834 8897355 : older_delta( sql_delta *d, sql_trans *tr)
835 : {
836 8897355 : sql_delta *o = d->next;
837 :
838 8902513 : while (o && !o->cs.merged) {
839 5153 : if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
840 : break;
841 : else
842 5158 : o = o->next;
843 : }
844 8897360 : if (o && !o->cs.merged && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
845 0 : return o;
846 : return NULL;
847 : }
848 :
849 : static BAT *
850 8897194 : bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
851 : {
852 8897194 : assert(tr->active);
853 8897194 : sql_delta *o = NULL;
854 8897194 : BAT *ui = NULL, *uv = NULL;
855 :
856 8897194 : if (!(ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type, cnt)))
857 : return NULL;
858 8897291 : if (access == RD_UPD_VAL) {
859 4449071 : if (!(uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type, cnt))) {
860 0 : bat_destroy(ui);
861 0 : return NULL;
862 : }
863 : }
864 8897351 : while ((o = older_delta(d, tr)) != NULL) {
865 0 : BAT *oui = NULL, *ouv = NULL;
866 0 : if (!oui)
867 0 : oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
868 0 : if (access == RD_UPD_VAL)
869 0 : ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
870 0 : if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
871 0 : bat_destroy(ui);
872 0 : bat_destroy(uv);
873 0 : bat_destroy(oui);
874 0 : bat_destroy(ouv);
875 0 : return NULL;
876 : }
877 0 : if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
878 : return NULL;
879 : d = o;
880 : }
881 8897341 : if (uv) {
882 4449129 : bat_destroy(ui);
883 4449129 : return uv;
884 : }
885 : return ui;
886 : }
887 :
888 : static BAT *
889 519 : bind_ucol(sql_trans *tr, sql_column *c, int access, size_t cnt)
890 : {
891 519 : lock_column(tr->store, c->base.id);
892 519 : sql_delta *d = col_timestamp_delta(tr, c);
893 519 : int type = c->type.type->localtype;
894 :
895 519 : if (!d) {
896 0 : unlock_column(tr->store, c->base.id);
897 0 : return NULL;
898 : }
899 519 : if (d->cs.st == ST_DICT) {
900 0 : BAT *b = quick_descriptor(d->cs.bid);
901 :
902 0 : type = b->ttype;
903 : }
904 519 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
905 519 : unlock_column(tr->store, c->base.id);
906 519 : return bn;
907 : }
908 :
909 : static BAT *
910 0 : bind_uidx(sql_trans *tr, sql_idx * i, int access, size_t cnt)
911 : {
912 0 : lock_column(tr->store, i->base.id);
913 0 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
914 0 : sql_delta *d = idx_timestamp_delta(tr, i);
915 :
916 0 : if (!d) {
917 0 : unlock_column(tr->store, i->base.id);
918 0 : return NULL;
919 : }
920 0 : BAT *bn = bind_ubat(tr, d, access, type, cnt);
921 0 : unlock_column(tr->store, i->base.id);
922 0 : return bn;
923 : }
924 :
925 : static BAT *
926 8949327 : cs_bind_bat( column_storage *cs, int access, size_t cnt)
927 : {
928 8949327 : BAT *b;
929 :
930 8949327 : assert(access == RDONLY || access == QUICK || access == RD_EXT);
931 8949327 : assert(cs != NULL);
932 8949327 : if (access == QUICK)
933 136113 : return quick_descriptor(cs->bid);
934 8813214 : if (access == RD_EXT)
935 860 : return temp_descriptor(cs->ebid);
936 8812354 : assert(cs->bid);
937 8812354 : b = temp_descriptor(cs->bid);
938 8812391 : if (b == NULL)
939 : return NULL;
940 8812391 : assert(b->batRestricted == BAT_READ);
941 : /* return slice */
942 8812391 : BAT *s = BATslice(b, 0, cnt);
943 8812340 : bat_destroy(b);
944 8812340 : return s;
945 : }
946 :
947 : static int
948 4448247 : bind_updates(sql_trans *tr, sql_column *c, BAT **ui, BAT **uv)
949 : {
950 4448247 : lock_column(tr->store, c->base.id);
951 4448068 : size_t cnt = count_col(tr, c, 0);
952 4448409 : sql_delta *d = col_timestamp_delta(tr, c);
953 4448381 : int type = c->type.type->localtype;
954 :
955 4448381 : if (!d) {
956 0 : unlock_column(tr->store, c->base.id);
957 0 : return LOG_ERR;
958 : }
959 4448381 : if (d->cs.st == ST_DICT) {
960 2 : BAT *b = quick_descriptor(d->cs.bid);
961 :
962 2 : type = b->ttype;
963 : }
964 :
965 4448381 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
966 4448566 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
967 :
968 4448553 : unlock_column(tr->store, c->base.id);
969 :
970 4448520 : if (*ui == NULL || *uv == NULL) {
971 0 : bat_destroy(*ui);
972 0 : bat_destroy(*uv);
973 0 : return LOG_ERR;
974 : }
975 : return LOG_OK;
976 : }
977 :
978 : static int
979 57 : bind_updates_idx(sql_trans *tr, sql_idx *i, BAT **ui, BAT **uv)
980 : {
981 57 : lock_column(tr->store, i->base.id);
982 57 : size_t cnt = count_idx(tr, i, 0);
983 57 : sql_delta *d = idx_timestamp_delta(tr, i);
984 57 : int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
985 :
986 57 : if (!d) {
987 0 : unlock_column(tr->store, i->base.id);
988 0 : return LOG_ERR;
989 : }
990 :
991 57 : *ui = bind_ubat(tr, d, RD_UPD_ID, type, cnt);
992 57 : *uv = bind_ubat(tr, d, RD_UPD_VAL, type, cnt);
993 :
994 57 : unlock_column(tr->store, i->base.id);
995 :
996 57 : if (*ui == NULL || *uv == NULL) {
997 0 : bat_destroy(*ui);
998 0 : bat_destroy(*uv);
999 0 : return LOG_ERR;
1000 : }
1001 : return LOG_OK;
1002 : }
1003 :
1004 : static void * /* BAT * */
1005 8942681 : bind_col(sql_trans *tr, sql_column *c, int access)
1006 : {
1007 8942681 : assert(access == QUICK || tr->active);
1008 8942681 : if (!isTable(c->t))
1009 : return NULL;
1010 8942681 : sql_delta *d = col_timestamp_delta(tr, c);
1011 8942462 : if (!d)
1012 : return NULL;
1013 8942462 : size_t cnt = count_col(tr, c, 0);
1014 8942425 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1015 519 : return bind_ucol(tr, c, access, cnt);
1016 8941906 : BAT *b = cs_bind_bat( &d->cs, access, cnt);
1017 8942055 : assert(!b || ((c->storage_type && access != RD_EXT) || b->ttype == c->type.type->localtype) || (access == QUICK && b->ttype < 0));
1018 : return b;
1019 : }
1020 :
1021 : static void * /* BAT * */
1022 7361 : bind_idx(sql_trans *tr, sql_idx * i, int access)
1023 : {
1024 7361 : assert(access == QUICK || tr->active);
1025 7361 : if (!isTable(i->t))
1026 : return NULL;
1027 7361 : sql_delta *d = idx_timestamp_delta(tr, i);
1028 7361 : if (!d)
1029 : return NULL;
1030 7361 : size_t cnt = count_idx(tr, i, 0);
1031 7361 : if (access == RD_UPD_ID || access == RD_UPD_VAL)
1032 0 : return bind_uidx(tr, i, access, cnt);
1033 7361 : return cs_bind_bat( &d->cs, access, cnt);
1034 : }
1035 :
1036 : static int
1037 3935 : cs_real_update_bats( column_storage *cs, BAT **Ui, BAT **Uv)
1038 : {
1039 3935 : if (!cs->uibid) {
1040 0 : cs->uibid = e_bat(TYPE_oid);
1041 0 : if (cs->uibid == BID_NIL)
1042 : return LOG_ERR;
1043 : }
1044 3935 : if (!cs->uvbid) {
1045 0 : BAT *cur = quick_descriptor(cs->bid);
1046 0 : if (!cur)
1047 : return LOG_ERR;
1048 0 : int type = cur->ttype;
1049 0 : cs->uvbid = e_bat(type);
1050 0 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1051 : return LOG_ERR;
1052 : }
1053 3935 : BAT *ui = temp_descriptor(cs->uibid);
1054 3935 : BAT *uv = temp_descriptor(cs->uvbid);
1055 :
1056 3935 : if (ui == NULL || uv == NULL) {
1057 0 : bat_destroy(ui);
1058 0 : bat_destroy(uv);
1059 0 : return LOG_ERR;
1060 : }
1061 3935 : assert(ui && uv);
1062 3935 : if (isEbat(ui)){
1063 411 : temp_destroy(cs->uibid);
1064 411 : cs->uibid = temp_copy(ui->batCacheid, true, true);
1065 411 : bat_destroy(ui);
1066 411 : if (cs->uibid == BID_NIL ||
1067 411 : (ui = temp_descriptor(cs->uibid)) == NULL) {
1068 0 : bat_destroy(uv);
1069 0 : return LOG_ERR;
1070 : }
1071 : }
1072 3935 : if (isEbat(uv)){
1073 411 : temp_destroy(cs->uvbid);
1074 411 : cs->uvbid = temp_copy(uv->batCacheid, true, true);
1075 411 : bat_destroy(uv);
1076 411 : if (cs->uvbid == BID_NIL ||
1077 411 : (uv = temp_descriptor(cs->uvbid)) == NULL) {
1078 0 : bat_destroy(ui);
1079 0 : return LOG_ERR;
1080 : }
1081 : }
1082 3935 : *Ui = ui;
1083 3935 : *Uv = uv;
1084 3935 : return LOG_OK;
1085 : }
1086 :
1087 : static int
1088 6454 : segments_is_append(segment *s, sql_trans *tr, oid rid)
1089 : {
1090 93298 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1091 93298 : if (s->start <= rid && s->end > rid) {
1092 6454 : if (s->ts == tr->tid && !s->deleted) {
1093 2625 : return 1;
1094 : }
1095 : break;
1096 : }
1097 : }
1098 : return 0;
1099 : }
1100 :
1101 : static int
1102 3829 : segments_is_deleted(segment *s, sql_trans *tr, oid rid)
1103 : {
1104 87998 : for(; s; s=ATOMIC_PTR_GET(&s->next)) {
1105 87998 : if (s->start <= rid && s->end > rid) {
1106 3829 : if (s->ts >= tr->ts && s->deleted) {
1107 0 : return 1;
1108 : }
1109 : break;
1110 : }
1111 : }
1112 : return 0;
1113 : }
1114 :
1115 : static sql_delta *
1116 0 : tr_dup_delta(sql_trans *tr, sql_delta *bat)
1117 : {
1118 0 : sql_delta *n = ZNEW(sql_delta);
1119 0 : if (!n)
1120 : return NULL;
1121 0 : *n = *bat;
1122 0 : n->next = NULL;
1123 0 : n->cs.ts = tr->tid;
1124 0 : return n;
1125 : }
1126 :
1127 : static BAT *
1128 17 : dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
1129 : {
1130 17 : BAT *newoffsets = NULL;
1131 17 : sql_delta *bat = *batp;
1132 17 : column_storage *cs = &bat->cs;
1133 17 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1134 :
1135 17 : if (!u)
1136 : return NULL;
1137 17 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1138 17 : if (DICTprepare4append(&newoffsets, i, u) < 0) {
1139 0 : bat_destroy(u);
1140 0 : return NULL;
1141 : } else {
1142 17 : int new = 0;
1143 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1144 17 : if (BATcount(u) >= max_cnt) {
1145 1 : if (max_cnt == 64*1024) { /* decompress */
1146 0 : if (!(b = temp_descriptor(cs->bid))) {
1147 0 : bat_destroy(u);
1148 0 : return NULL;
1149 : }
1150 0 : if (cs->ucnt) {
1151 0 : BAT *ui = NULL, *uv = NULL;
1152 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1153 0 : bat_destroy(b);
1154 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1155 0 : bat_destroy(nb);
1156 0 : bat_destroy(u);
1157 0 : return NULL;
1158 : }
1159 0 : b = nb;
1160 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1161 0 : bat_destroy(ui);
1162 0 : bat_destroy(uv);
1163 0 : bat_destroy(b);
1164 0 : bat_destroy(u);
1165 : }
1166 0 : bat_destroy(ui);
1167 0 : bat_destroy(uv);
1168 : }
1169 0 : n = DICTdecompress_(b, u, PERSISTENT);
1170 0 : bat_destroy(b);
1171 0 : assert(newoffsets == NULL);
1172 0 : if (!n) {
1173 0 : bat_destroy(u);
1174 0 : return NULL;
1175 : }
1176 0 : if (cs->ts != tr->tid) {
1177 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1178 0 : bat_destroy(n);
1179 0 : return NULL;
1180 : }
1181 0 : cs = &(*batp)->cs;
1182 0 : new = 1;
1183 : }
1184 0 : if (cs->bid && !new)
1185 0 : temp_destroy(cs->bid);
1186 0 : n = transfer_to_systrans(n);
1187 0 : if (n == NULL)
1188 : return NULL;
1189 0 : bat_set_access(n, BAT_READ);
1190 0 : cs->bid = temp_create(n);
1191 0 : bat_destroy(n);
1192 0 : if (cs->ebid && !new)
1193 0 : temp_destroy(cs->ebid);
1194 0 : cs->ebid = 0;
1195 0 : cs->ucnt = 0;
1196 0 : if (cs->uibid && !new)
1197 0 : temp_destroy(cs->uibid);
1198 0 : if (cs->uvbid && !new)
1199 0 : temp_destroy(cs->uvbid);
1200 0 : cs->uibid = cs->uvbid = 0;
1201 0 : cs->st = ST_DEFAULT;
1202 0 : cs->cleared = true;
1203 : } else {
1204 1 : if (!(b = temp_descriptor(cs->bid))) {
1205 0 : bat_destroy(newoffsets);
1206 0 : bat_destroy(u);
1207 0 : return NULL;
1208 : }
1209 1 : n = DICTenlarge(b, BATcount(b), BATcount(b) + BATcount(i), PERSISTENT);
1210 1 : bat_destroy(b);
1211 1 : if (!n) {
1212 0 : bat_destroy(newoffsets);
1213 0 : bat_destroy(u);
1214 0 : return NULL;
1215 : }
1216 1 : if (cs->ts != tr->tid) {
1217 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1218 0 : bat_destroy(n);
1219 0 : return NULL;
1220 : }
1221 0 : cs = &(*batp)->cs;
1222 0 : new = 1;
1223 0 : temp_dup(cs->ebid);
1224 0 : if (cs->uibid) {
1225 0 : temp_dup(cs->uibid);
1226 0 : temp_dup(cs->uvbid);
1227 : }
1228 : }
1229 1 : if (cs->bid && !new)
1230 1 : temp_destroy(cs->bid);
1231 1 : n = transfer_to_systrans(n);
1232 1 : if (n == NULL)
1233 : return NULL;
1234 1 : bat_set_access(n, BAT_READ);
1235 1 : cs->bid = temp_create(n);
1236 1 : bat_destroy(n);
1237 1 : cs->cleared = true;
1238 1 : i = newoffsets;
1239 : }
1240 : } else { /* append */
1241 16 : i = newoffsets;
1242 : }
1243 : }
1244 17 : bat_destroy(u);
1245 17 : return i;
1246 : }
1247 :
1248 : static BAT *
1249 0 : for_append_bat(column_storage *cs, BAT *i, char *storage_type)
1250 : {
1251 0 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1252 0 : BAT *newoffsets = NULL;
1253 0 : BAT *b = NULL, *n = NULL;
1254 :
1255 0 : if (!(b = temp_descriptor(cs->bid)))
1256 : return NULL;
1257 :
1258 0 : if (FORprepare4append(&newoffsets, i, offsetval, b->ttype) < 0) {
1259 0 : bat_destroy(b);
1260 0 : return NULL;
1261 : } else {
1262 : /* returns new offset bat if values within min/max, else decompress */
1263 0 : if (!newoffsets) { /* decompress */
1264 0 : if (cs->ucnt) {
1265 0 : BAT *ui = NULL, *uv = NULL;
1266 0 : BAT *nb = COLcopy(b, b->ttype, true, SYSTRANS);
1267 0 : bat_destroy(b);
1268 0 : if (!nb || cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1269 0 : bat_destroy(nb);
1270 0 : return NULL;
1271 : }
1272 0 : b = nb;
1273 0 : if (BATupdate(b, ui, uv, true) != GDK_SUCCEED) {
1274 0 : bat_destroy(ui);
1275 0 : bat_destroy(uv);
1276 0 : bat_destroy(b);
1277 : }
1278 0 : bat_destroy(ui);
1279 0 : bat_destroy(uv);
1280 : }
1281 0 : n = FORdecompress_(b, offsetval, i->ttype, PERSISTENT);
1282 0 : bat_destroy(b);
1283 0 : if (!n)
1284 : return NULL;
1285 0 : if (cs->bid)
1286 0 : temp_destroy(cs->bid);
1287 0 : n = transfer_to_systrans(n);
1288 0 : if (n == NULL)
1289 : return NULL;
1290 0 : bat_set_access(n, BAT_READ);
1291 0 : cs->bid = temp_create(n);
1292 0 : cs->ucnt = 0;
1293 0 : if (cs->uibid)
1294 0 : temp_destroy(cs->uibid);
1295 0 : if (cs->uvbid)
1296 0 : temp_destroy(cs->uvbid);
1297 0 : cs->uibid = cs->uvbid = 0;
1298 0 : cs->st = ST_DEFAULT;
1299 0 : cs->cleared = true;
1300 0 : b = n;
1301 : } else { /* append */
1302 : i = newoffsets;
1303 : }
1304 : }
1305 0 : bat_destroy(b);
1306 0 : return i;
1307 : }
1308 :
1309 : /*
1310 : * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
1311 : */
1312 : static int
1313 3064 : cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT *updates, int is_new)
1314 : {
1315 3064 : int res = LOG_OK;
1316 3064 : sql_delta *bat = *batp;
1317 3064 : column_storage *cs = &bat->cs;
1318 3064 : BAT *otids = tids, *oupdates = updates;
1319 :
1320 3064 : if (!BATcount(tids))
1321 : return LOG_OK;
1322 :
1323 3064 : if (tids && (tids->ttype == TYPE_msk || mask_cand(tids))) {
1324 6 : tids = BATunmask(tids);
1325 6 : if (!tids)
1326 : return LOG_ERR;
1327 : }
1328 3064 : if (updates && (updates->ttype == TYPE_msk || mask_cand(updates))) {
1329 0 : updates = BATunmask(updates);
1330 0 : if (!updates) {
1331 0 : if (otids != tids)
1332 0 : bat_destroy(tids);
1333 0 : return LOG_ERR;
1334 : }
1335 3064 : } else if (updates && updates->ttype == TYPE_void && !complex_cand(updates)) { /* dense later use optimized log structure */
1336 43 : updates = COLcopy(updates, TYPE_oid, true /* make sure we get a oid col */, SYSTRANS);
1337 43 : if (!updates) {
1338 0 : if (otids != tids)
1339 0 : bat_destroy(tids);
1340 0 : return LOG_ERR;
1341 : }
1342 : }
1343 :
1344 3064 : if (cs->st == ST_DICT) {
1345 : /* possibly a new array is returned */
1346 4 : BAT *nupdates = dict_append_bat(tr, batp, updates);
1347 4 : bat = *batp;
1348 4 : cs = &bat->cs;
1349 4 : if (oupdates != updates)
1350 0 : bat_destroy(updates);
1351 4 : updates = nupdates;
1352 4 : if (!updates) {
1353 0 : if (otids != tids)
1354 0 : bat_destroy(tids);
1355 0 : return LOG_ERR;
1356 : }
1357 : }
1358 :
1359 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1360 : /* currently only one update delta is possible */
1361 3064 : lock_table(tr->store, t->base.id);
1362 3064 : storage *s = ATOMIC_PTR_GET(&t->data);
1363 3064 : if (!is_new && !cs->cleared) {
1364 2742 : if (!tids->tsorted /* make sure we have simple dense or oids */) {
1365 6 : BAT *sorted, *order;
1366 6 : if (BATsort(&sorted, &order, NULL, tids, NULL, NULL, false, false, false) != GDK_SUCCEED) {
1367 0 : if (otids != tids)
1368 0 : bat_destroy(tids);
1369 0 : if (oupdates != updates)
1370 0 : bat_destroy(updates);
1371 0 : unlock_table(tr->store, t->base.id);
1372 0 : return LOG_ERR;
1373 : }
1374 6 : if (otids != tids)
1375 0 : bat_destroy(tids);
1376 6 : tids = sorted;
1377 6 : BAT *nupdates = BATproject(order, updates);
1378 6 : bat_destroy(order);
1379 6 : if (oupdates != updates)
1380 0 : bat_destroy(updates);
1381 6 : updates = nupdates;
1382 6 : if (!updates) {
1383 0 : bat_destroy(tids);
1384 0 : unlock_table(tr->store, t->base.id);
1385 0 : return LOG_ERR;
1386 : }
1387 : }
1388 2742 : assert(tids->tsorted);
1389 2742 : BAT *ui = NULL, *uv = NULL;
1390 :
1391 : /* handle updates on just inserted bits */
1392 : /* handle updates on updates (within one transaction) */
1393 2742 : BATiter upi = bat_iterator(updates);
1394 2742 : BUN cnt = 0, ucnt = BATcount(tids);
1395 2742 : BAT *b, *ins = NULL;
1396 2742 : int *msk = NULL;
1397 :
1398 2742 : if((b = temp_descriptor(cs->bid)) == NULL)
1399 : res = LOG_ERR;
1400 :
1401 2742 : if (res == LOG_OK && BATtdense(tids)) {
1402 2507 : oid start = tids->tseqbase, offset = start;
1403 2507 : oid end = start + ucnt;
1404 :
1405 8398 : for(segment *seg = s->segs->h; seg && res == LOG_OK ; seg=ATOMIC_PTR_GET(&seg->next)) {
1406 6525 : if (seg->start <= start && seg->end > start) {
1407 : /* check for delete conflicts */
1408 2507 : if (seg->ts >= tr->ts && seg->deleted) {
1409 0 : res = LOG_CONFLICT;
1410 0 : continue;
1411 : }
1412 :
1413 : /* check for inplace updates */
1414 2507 : BUN lend = end < seg->end?end:seg->end;
1415 2507 : if (seg->ts == tr->tid && !seg->deleted) {
1416 133 : if (!ins) {
1417 133 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1418 133 : if (!ins)
1419 : res = LOG_ERR;
1420 : else {
1421 133 : BATsetcount(ins, ucnt); /* all full updates */
1422 133 : msk = (int*)Tloc(ins, 0);
1423 133 : BUN end = (ucnt+31)/32;
1424 133 : memset(msk, 0, end * sizeof(int));
1425 : }
1426 : }
1427 726 : for (oid i = 0, rid = start; rid < lend && res == LOG_OK; rid++, i++) {
1428 593 : const void *upd = BUNtail(upi, rid-offset);
1429 593 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1430 0 : res = LOG_ERR;
1431 :
1432 593 : oid word = i/32;
1433 593 : int pos = i%32;
1434 593 : msk[word] |= 1U<<pos;
1435 593 : cnt++;
1436 : }
1437 : }
1438 : }
1439 6525 : if (end < seg->end)
1440 : break;
1441 : }
1442 238 : } else if (res == LOG_OK && complex_cand(tids)) {
1443 3 : struct canditer ci;
1444 3 : segment *seg = s->segs->h;
1445 3 : canditer_init(&ci, NULL, tids);
1446 3 : BUN i = 0;
1447 1036 : while ( seg && res == LOG_OK && i < ucnt) {
1448 1033 : oid rid = canditer_next(&ci);
1449 1033 : if (seg->end <= rid)
1450 13 : seg = ATOMIC_PTR_GET(&seg->next);
1451 1020 : else if (seg->start <= rid && seg->end > rid) {
1452 : /* check for delete conflicts */
1453 1020 : if (seg->ts >= tr->ts && seg->deleted) {
1454 0 : res = LOG_CONFLICT;
1455 0 : continue;
1456 : }
1457 :
1458 : /* check for inplace updates */
1459 1020 : if (seg->ts == tr->tid && !seg->deleted) {
1460 0 : if (!ins) {
1461 0 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1462 0 : if (!ins) {
1463 : res = LOG_ERR;
1464 : break;
1465 : } else {
1466 0 : BATsetcount(ins, ucnt); /* all full updates */
1467 0 : msk = (int*)Tloc(ins, 0);
1468 0 : BUN end = (ucnt+31)/32;
1469 0 : memset(msk, 0, end * sizeof(int));
1470 : }
1471 : }
1472 0 : ptr upd = BUNtail(upi, i);
1473 0 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED)
1474 0 : res = LOG_ERR;
1475 :
1476 0 : oid word = i/32;
1477 0 : int pos = i%32;
1478 0 : msk[word] |= 1U<<pos;
1479 0 : cnt++;
1480 : }
1481 1020 : i++;
1482 : }
1483 : }
1484 232 : } else if (res == LOG_OK) {
1485 232 : BUN i = 0;
1486 232 : oid *rid = Tloc(tids,0);
1487 232 : segment *seg = s->segs->h;
1488 28097 : while ( seg && res == LOG_OK && i < ucnt) {
1489 27865 : if (seg->end <= rid[i])
1490 8113 : seg = ATOMIC_PTR_GET(&seg->next);
1491 19752 : else if (seg->start <= rid[i] && seg->end > rid[i]) {
1492 : /* check for delete conflicts */
1493 19752 : if (seg->ts >= tr->ts && seg->deleted) {
1494 0 : res = LOG_CONFLICT;
1495 0 : continue;
1496 : }
1497 :
1498 : /* check for inplace updates */
1499 19752 : if (seg->ts == tr->tid && !seg->deleted) {
1500 417 : if (!ins) {
1501 47 : ins = COLnew(0, TYPE_msk, ucnt, SYSTRANS);
1502 47 : if (!ins) {
1503 : res = LOG_ERR;
1504 : break;
1505 : } else {
1506 47 : BATsetcount(ins, ucnt); /* all full updates */
1507 47 : msk = (int*)Tloc(ins, 0);
1508 47 : BUN end = (ucnt+31)/32;
1509 47 : memset(msk, 0, end * sizeof(int));
1510 : }
1511 : }
1512 417 : const void *upd = BUNtail(upi, i);
1513 417 : if (void_inplace(b, rid[i], upd, true) != GDK_SUCCEED)
1514 0 : res = LOG_ERR;
1515 :
1516 417 : oid word = i/32;
1517 417 : int pos = i%32;
1518 417 : msk[word] |= 1U<<pos;
1519 417 : cnt++;
1520 : }
1521 19752 : i++;
1522 : }
1523 : }
1524 : }
1525 :
1526 2742 : if (res == LOG_OK && cnt < ucnt) { /* now handle real updates */
1527 2562 : if (cs->ucnt == 0) {
1528 2456 : if (cnt) {
1529 0 : BAT *nins = BATmaskedcands(0, ucnt, ins, false);
1530 0 : if (nins) {
1531 0 : ui = BATproject(nins, tids);
1532 0 : uv = BATproject(nins, updates);
1533 0 : bat_destroy(nins);
1534 : }
1535 : } else {
1536 2456 : ui = temp_descriptor(tids->batCacheid);
1537 2456 : uv = temp_descriptor(updates->batCacheid);
1538 : }
1539 2456 : if (!ui || !uv) {
1540 : res = LOG_ERR;
1541 : } else {
1542 2456 : temp_destroy(cs->uibid);
1543 2456 : temp_destroy(cs->uvbid);
1544 2456 : ui = transfer_to_systrans(ui);
1545 2456 : uv = transfer_to_systrans(uv);
1546 2456 : if (ui == NULL || uv == NULL) {
1547 0 : BBPreclaim(ui);
1548 0 : BBPreclaim(uv);
1549 : res = LOG_ERR;
1550 : } else {
1551 2456 : cs->uibid = temp_create(ui);
1552 2456 : cs->uvbid = temp_create(uv);
1553 2456 : cs->ucnt = BATcount(ui);
1554 : }
1555 : }
1556 : } else {
1557 106 : BAT *nui = NULL, *nuv = NULL;
1558 :
1559 : /* merge taking msk of inserted into account */
1560 106 : if (res == LOG_OK && cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
1561 : res = LOG_ERR;
1562 :
1563 106 : if (res == LOG_OK) {
1564 106 : const void *upd = NULL;
1565 106 : nui = bat_new(TYPE_oid, cs->ucnt + ucnt - cnt, SYSTRANS);
1566 106 : nuv = bat_new(uv->ttype, cs->ucnt + ucnt - cnt, SYSTRANS);
1567 :
1568 106 : if (!nui || !nuv) {
1569 : res = LOG_ERR;
1570 : } else {
1571 106 : BATiter ovi = bat_iterator(uv);
1572 :
1573 : /* handle dense (void) cases together as we need to merge updates (which is slower anyway) */
1574 106 : BUN uip = 0, uie = BATcount(ui);
1575 106 : BUN nip = 0, nie = BATcount(tids);
1576 106 : oid uiseqb = ui->tseqbase;
1577 106 : oid niseqb = tids->tseqbase;
1578 106 : oid *uipt = NULL, *nipt = NULL;
1579 106 : BATiter uii = bat_iterator(ui);
1580 106 : BATiter tidsi = bat_iterator(tids);
1581 106 : if (!BATtdensebi(&uii))
1582 105 : uipt = uii.base;
1583 106 : if (!BATtdensebi(&tidsi))
1584 105 : nipt = tidsi.base;
1585 26789 : while (uip < uie && nip < nie && res == LOG_OK) {
1586 26683 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1587 26683 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1588 :
1589 26683 : if (uiv < niv) {
1590 15068 : upd = BUNtail(ovi, uip);
1591 30136 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1592 15068 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1593 : res = LOG_ERR;
1594 15068 : uip++;
1595 11615 : } else if (uiv == niv) {
1596 : /* handle == */
1597 18 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1598 18 : upd = BUNtail(upi, nip);
1599 36 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1600 18 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1601 : res = LOG_ERR;
1602 : } else {
1603 0 : upd = BUNtail(ovi, uip);
1604 0 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1605 0 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1606 : res = LOG_ERR;
1607 : }
1608 18 : uip++;
1609 18 : nip++;
1610 : } else { /* uiv > niv */
1611 11597 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1612 11597 : upd = BUNtail(upi, nip);
1613 23194 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1614 11597 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1615 : res = LOG_ERR;
1616 : }
1617 11597 : nip++;
1618 : }
1619 : }
1620 542 : while (uip < uie && res == LOG_OK) {
1621 436 : oid uiv = (uipt)?uipt[uip]: uiseqb+uip;
1622 436 : upd = BUNtail(ovi, uip);
1623 872 : if (BUNappend(nui, (ptr) &uiv, true) != GDK_SUCCEED ||
1624 436 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1625 : res = LOG_ERR;
1626 436 : uip++;
1627 : }
1628 799 : while (nip < nie && res == LOG_OK) {
1629 693 : oid niv = (nipt)?nipt[nip]: niseqb+nip;
1630 693 : if (!msk || (msk[nip/32] & (1U<<(nip%32))) == 0) {
1631 693 : upd = BUNtail(upi, nip);
1632 1386 : if (BUNappend(nui, (ptr) &niv, true) != GDK_SUCCEED ||
1633 693 : BUNappend(nuv, (ptr) upd, true) != GDK_SUCCEED)
1634 : res = LOG_ERR;
1635 : }
1636 693 : nip++;
1637 : }
1638 106 : bat_iterator_end(&uii);
1639 106 : bat_iterator_end(&tidsi);
1640 106 : bat_iterator_end(&ovi);
1641 106 : if (res == LOG_OK) {
1642 106 : temp_destroy(cs->uibid);
1643 106 : temp_destroy(cs->uvbid);
1644 106 : nui = transfer_to_systrans(nui);
1645 106 : nuv = transfer_to_systrans(nuv);
1646 106 : if (nui == NULL || nuv == NULL) {
1647 : res = LOG_ERR;
1648 : } else {
1649 106 : cs->uibid = temp_create(nui);
1650 106 : cs->uvbid = temp_create(nuv);
1651 106 : cs->ucnt = BATcount(nui);
1652 : }
1653 : }
1654 : }
1655 106 : bat_destroy(nui);
1656 106 : bat_destroy(nuv);
1657 : }
1658 : }
1659 : }
1660 2742 : bat_iterator_end(&upi);
1661 2742 : bat_destroy(b);
1662 2742 : unlock_table(tr->store, t->base.id);
1663 2742 : bat_destroy(ins);
1664 2742 : bat_destroy(ui);
1665 2742 : bat_destroy(uv);
1666 2742 : if (otids != tids)
1667 10 : bat_destroy(tids);
1668 2742 : if (oupdates != updates)
1669 12 : bat_destroy(updates);
1670 2742 : return res;
1671 : } else if (is_new || cs->cleared) {
1672 322 : BAT *b = temp_descriptor(cs->bid);
1673 :
1674 322 : if (b == NULL) {
1675 : res = LOG_ERR;
1676 : } else {
1677 322 : if (BATcount(b)==0) {
1678 1 : if (BATappend(b, updates, NULL, true) != GDK_SUCCEED) /* alter add column */
1679 0 : res = LOG_ERR;
1680 321 : } else if (BATreplace(b, tids, updates, true) != GDK_SUCCEED)
1681 0 : res = LOG_ERR;
1682 322 : BBPcold(b->batCacheid);
1683 322 : bat_destroy(b);
1684 : }
1685 : }
1686 322 : unlock_table(tr->store, t->base.id);
1687 322 : if (otids != tids)
1688 2 : bat_destroy(tids);
1689 322 : if (oupdates != updates)
1690 41 : bat_destroy(updates);
1691 : return res;
1692 : }
1693 :
1694 : static int
1695 3064 : delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT *updates, int is_new)
1696 : {
1697 3064 : return cs_update_bat(tr, bat, t, tids, updates, is_new);
1698 : }
1699 :
1700 : static void *
1701 4 : dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
1702 : {
1703 4 : void *newoffsets = NULL;
1704 4 : sql_delta *bat = *batp;
1705 4 : column_storage *cs = &bat->cs;
1706 4 : BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
1707 :
1708 4 : if (!u)
1709 : return NULL;
1710 4 : BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
1711 4 : if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
1712 0 : bat_destroy(u);
1713 0 : return NULL;
1714 : } else {
1715 4 : int new = 0;
1716 : /* returns new offset bat (ie to be appended), possibly with larger type ! */
1717 4 : if (BATcount(u) >= max_cnt) {
1718 0 : if (max_cnt == 64*1024) { /* decompress */
1719 0 : if (!(b = temp_descriptor(cs->bid))) {
1720 0 : bat_destroy(u);
1721 0 : return NULL;
1722 : }
1723 0 : n = DICTdecompress_(b, u, PERSISTENT);
1724 : /* TODO decompress updates if any */
1725 0 : bat_destroy(b);
1726 0 : assert(newoffsets == NULL);
1727 0 : if (!n) {
1728 0 : bat_destroy(u);
1729 0 : return NULL;
1730 : }
1731 0 : if (cs->ts != tr->tid) {
1732 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1733 0 : bat_destroy(n);
1734 0 : bat_destroy(u);
1735 0 : return NULL;
1736 : }
1737 0 : cs = &(*batp)->cs;
1738 0 : new = 1;
1739 0 : cs->uibid = cs->uvbid = 0;
1740 : }
1741 0 : if (cs->bid && !new)
1742 0 : temp_destroy(cs->bid);
1743 0 : n = transfer_to_systrans(n);
1744 0 : if (n == NULL) {
1745 0 : bat_destroy(u);
1746 0 : return NULL;
1747 : }
1748 0 : bat_set_access(n, BAT_READ);
1749 0 : cs->bid = temp_create(n);
1750 0 : bat_destroy(n);
1751 0 : if (cs->ebid && !new)
1752 0 : temp_destroy(cs->ebid);
1753 0 : cs->ebid = 0;
1754 0 : cs->st = ST_DEFAULT;
1755 : /* at append_col the column's storage type is cleared */
1756 0 : cs->cleared = true;
1757 : } else {
1758 0 : if (!(b = temp_descriptor(cs->bid))) {
1759 0 : GDKfree(newoffsets);
1760 0 : bat_destroy(u);
1761 0 : return NULL;
1762 : }
1763 0 : n = DICTenlarge(b, BATcount(b), BATcount(b) + cnt, PERSISTENT);
1764 0 : bat_destroy(b);
1765 0 : if (!n) {
1766 0 : GDKfree(newoffsets);
1767 0 : bat_destroy(u);
1768 0 : return NULL;
1769 : }
1770 0 : if (cs->ts != tr->tid) {
1771 0 : if ((*batp = tr_dup_delta(tr, bat)) == NULL) {
1772 0 : bat_destroy(u);
1773 0 : bat_destroy(n);
1774 0 : return NULL;
1775 : }
1776 0 : cs = &(*batp)->cs;
1777 0 : new = 1;
1778 0 : temp_dup(cs->ebid);
1779 0 : if (cs->uibid) {
1780 0 : temp_dup(cs->uibid);
1781 0 : temp_dup(cs->uvbid);
1782 : }
1783 : }
1784 0 : if (cs->bid)
1785 0 : temp_destroy(cs->bid);
1786 0 : n = transfer_to_systrans(n);
1787 0 : if (n == NULL) {
1788 0 : bat_destroy(u);
1789 0 : return NULL;
1790 : }
1791 0 : bat_set_access(n, BAT_READ);
1792 0 : cs->bid = temp_create(n);
1793 0 : bat_destroy(n);
1794 0 : cs->cleared = true;
1795 0 : i = newoffsets;
1796 : }
1797 : } else { /* append */
1798 4 : i = newoffsets;
1799 : }
1800 : }
1801 4 : bat_destroy(u);
1802 4 : return i;
1803 : }
1804 :
1805 : static void *
1806 1 : for_append_val(column_storage *cs, void *i, BUN cnt, char *storage_type, int tt)
1807 : {
1808 1 : lng offsetval = strtoll(storage_type+4, NULL, 10);
1809 1 : void *newoffsets = NULL;
1810 1 : BAT *b = NULL, *n = NULL;
1811 :
1812 1 : if (!(b = temp_descriptor(cs->bid)))
1813 : return NULL;
1814 :
1815 1 : if (FORprepare4append_vals(&newoffsets, i, cnt, offsetval, tt, b->ttype) < 0) {
1816 0 : bat_destroy(b);
1817 0 : return NULL;
1818 : } else {
1819 : /* returns new offset bat if values within min/max, else decompress */
1820 1 : if (!newoffsets) {
1821 1 : n = FORdecompress_(b, offsetval, tt, PERSISTENT);
1822 1 : bat_destroy(b);
1823 1 : if (!n)
1824 : return NULL;
1825 : /* TODO decompress updates if any */
1826 1 : if (cs->bid)
1827 1 : temp_destroy(cs->bid);
1828 1 : n = transfer_to_systrans(n);
1829 1 : if (n == NULL)
1830 : return NULL;
1831 1 : bat_set_access(n, BAT_READ);
1832 1 : cs->bid = temp_create(n);
1833 1 : cs->st = ST_DEFAULT;
1834 : /* at append_col the column's storage type is cleared */
1835 1 : cs->cleared = true;
1836 1 : b = n;
1837 : } else { /* append */
1838 : i = newoffsets;
1839 : }
1840 : }
1841 1 : bat_destroy(b);
1842 1 : return i;
1843 : }
1844 :
1845 : static int
1846 6454 : cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void *upd, int is_new)
1847 : {
1848 6454 : void *oupd = upd;
1849 6454 : sql_delta *bat = *batp;
1850 6454 : column_storage *cs = &bat->cs;
1851 6454 : storage *s = ATOMIC_PTR_GET(&t->data);
1852 6454 : assert(!is_oid_nil(rid));
1853 6454 : int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, tr, rid);
1854 :
1855 6454 : if (cs->st == ST_DICT) {
1856 : /* possibly a new array is returned */
1857 0 : upd = dict_append_val(tr, batp, upd, 1);
1858 0 : bat = *batp;
1859 0 : cs = &bat->cs;
1860 0 : if (!upd)
1861 : return LOG_ERR;
1862 : }
1863 :
1864 : /* check if rid is insert ? */
1865 6454 : if (!inplace) {
1866 : /* check conflict */
1867 3829 : if (segments_is_deleted(s->segs->h, tr, rid)) {
1868 0 : if (oupd != upd)
1869 0 : GDKfree(upd);
1870 0 : return LOG_CONFLICT;
1871 : }
1872 3829 : BAT *ui, *uv;
1873 :
1874 : /* When we go to smaller grained update structures we should check for concurrent updates on this column ! */
1875 : /* currently only one update delta is possible */
1876 3829 : if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
1877 0 : if (oupd != upd)
1878 0 : GDKfree(upd);
1879 0 : return LOG_ERR;
1880 : }
1881 :
1882 3829 : assert(uv->ttype);
1883 3829 : assert(BATcount(ui) == BATcount(uv));
1884 7658 : if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
1885 3829 : BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
1886 0 : if (oupd != upd)
1887 0 : GDKfree(upd);
1888 0 : bat_destroy(ui);
1889 0 : bat_destroy(uv);
1890 0 : return LOG_ERR;
1891 : }
1892 3829 : assert(BATcount(ui) == BATcount(uv));
1893 3829 : bat_destroy(ui);
1894 3829 : bat_destroy(uv);
1895 3829 : cs->ucnt++;
1896 : } else {
1897 2625 : BAT *b = NULL;
1898 :
1899 2625 : if((b = temp_descriptor(cs->bid)) == NULL) {
1900 0 : if (oupd != upd)
1901 0 : GDKfree(upd);
1902 0 : return LOG_ERR;
1903 : }
1904 2625 : if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
1905 0 : if (oupd != upd)
1906 0 : GDKfree(upd);
1907 0 : bat_destroy(b);
1908 0 : return LOG_ERR;
1909 : }
1910 2625 : bat_destroy(b);
1911 : }
1912 6454 : if (oupd != upd)
1913 0 : GDKfree(upd);
1914 : return LOG_OK;
1915 : }
1916 :
1917 : static int
1918 6454 : delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void *upd, int is_new)
1919 : {
1920 6454 : int res = LOG_OK;
1921 6454 : lock_table(tr->store, t->base.id);
1922 6454 : res = cs_update_val(tr, bat, t, rid, upd, is_new);
1923 6454 : unlock_table(tr->store, t->base.id);
1924 6454 : return res;
1925 : }
1926 :
1927 : static int
1928 158980 : dup_cs(sql_trans *tr, column_storage *ocs, column_storage *cs, int type, int temp)
1929 : {
1930 158980 : (void)tr;
1931 158980 : if (!ocs)
1932 : return LOG_OK;
1933 158980 : cs->bid = ocs->bid;
1934 158980 : cs->ebid = ocs->ebid;
1935 158980 : cs->uibid = ocs->uibid;
1936 158980 : cs->uvbid = ocs->uvbid;
1937 158980 : cs->ucnt = ocs->ucnt;
1938 :
1939 158980 : if (temp) {
1940 25983 : cs->bid = temp_copy(cs->bid, true, false);
1941 25983 : if (cs->bid == BID_NIL)
1942 : return LOG_ERR;
1943 : } else {
1944 132997 : temp_dup(cs->bid);
1945 : }
1946 158983 : if (cs->ebid)
1947 6 : temp_dup(cs->ebid);
1948 158983 : cs->ucnt = 0;
1949 158983 : cs->uibid = e_bat(TYPE_oid);
1950 158987 : cs->uvbid = e_bat(type);
1951 158989 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
1952 : return LOG_ERR;
1953 158989 : cs->st = ocs->st;
1954 158989 : return LOG_OK;
1955 : }
1956 :
1957 : static void
1958 312620 : destroy_delta(sql_delta *b, bool recursive)
1959 : {
1960 312620 : if (ATOMIC_DEC(&b->cs.refcnt) > 0)
1961 : return;
1962 293273 : if (recursive && b->next)
1963 71886 : destroy_delta(b->next, true);
1964 293273 : if (b->cs.uibid)
1965 95439 : temp_destroy(b->cs.uibid);
1966 293273 : if (b->cs.uvbid)
1967 95439 : temp_destroy(b->cs.uvbid);
1968 293273 : if (b->cs.bid)
1969 293273 : temp_destroy(b->cs.bid);
1970 293273 : if (b->cs.ebid)
1971 61 : temp_destroy(b->cs.ebid);
1972 293273 : b->cs.bid = b->cs.ebid = b->cs.uibid = b->cs.uvbid = 0;
1973 293273 : _DELETE(b);
1974 : }
1975 :
1976 : static sql_delta *
1977 15664512 : bind_col_data(sql_trans *tr, sql_column *c, bool *update_conflict)
1978 : {
1979 15664512 : sql_delta *obat = ATOMIC_PTR_GET(&c->data);
1980 :
1981 15664512 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
1982 15531522 : return obat;
1983 132990 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
1984 : /* abort */
1985 12 : if (update_conflict)
1986 4 : *update_conflict = true;
1987 8 : else if (!obat->cs.cleared) /* concurrent appends are only allowed on concurrent updates */
1988 8 : return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
1989 4 : return NULL;
1990 : }
1991 132978 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&c->data))))
1992 : return NULL;
1993 132976 : sql_delta* bat = ZNEW(sql_delta);
1994 132976 : if (!bat)
1995 : return NULL;
1996 132976 : ATOMIC_INIT(&bat->cs.refcnt, 1);
1997 132976 : if (dup_cs(tr, &obat->cs, &bat->cs, c->type.type->localtype, 0) != LOG_OK) {
1998 0 : destroy_delta(bat, false);
1999 0 : return NULL;
2000 : }
2001 132979 : bat->cs.ts = tr->tid;
2002 : /* only one writer else abort */
2003 132979 : bat->next = obat;
2004 132979 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&bat->next, bat)) {
2005 0 : bat->next = NULL;
2006 0 : destroy_delta(bat, false);
2007 0 : if (update_conflict)
2008 0 : *update_conflict = true;
2009 0 : return NULL;
2010 : }
2011 : return bat;
2012 : }
2013 :
2014 : static int
2015 9518 : update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool is_new, void *incoming_tids, void *incoming_values, bool is_bat)
2016 : {
2017 9518 : int ok = LOG_OK;
2018 :
2019 9518 : if (is_bat) {
2020 3064 : BAT *tids = incoming_tids;
2021 3064 : BAT *values = incoming_values;
2022 3064 : if (BATcount(tids) == 0)
2023 : return LOG_OK;
2024 3064 : ok = delta_update_bat(tr, delta, table, tids, values, is_new);
2025 : } else {
2026 6454 : ok = delta_update_val(tr, delta, table, *(oid*)incoming_tids, incoming_values, is_new);
2027 : }
2028 : return ok;
2029 : }
2030 :
2031 : static int
2032 9766 : update_col(sql_trans *tr, sql_column *c, void *tids, void *upd, bool isbat)
2033 : {
2034 9766 : int res = LOG_OK;
2035 9766 : bool update_conflict = false;
2036 9766 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2037 :
2038 9766 : if (isbat) {
2039 3309 : BAT *t = tids;
2040 3309 : if (!BATcount(t))
2041 : return LOG_OK;
2042 : }
2043 :
2044 9243 : if (c == NULL)
2045 : return LOG_ERR;
2046 :
2047 9243 : if ((delta = bind_col_data(tr, c, &update_conflict)) == NULL)
2048 4 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2049 :
2050 9239 : assert(delta && delta->cs.ts == tr->tid);
2051 9239 : assert(c->t->persistence != SQL_DECLARED_TABLE);
2052 9239 : if (odelta != delta)
2053 3449 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
2054 :
2055 9239 : odelta = delta;
2056 9239 : if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, isbat)) != LOG_OK)
2057 : return res;
2058 9239 : assert(delta == odelta);
2059 9239 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2060 0 : res = sql_trans_alter_storage(tr, c, NULL);
2061 : return res;
2062 : }
2063 :
2064 : static sql_delta *
2065 2457 : bind_idx_data(sql_trans *tr, sql_idx *i, bool *update_conflict)
2066 : {
2067 2457 : sql_delta *obat = ATOMIC_PTR_GET(&i->data);
2068 :
2069 2457 : if (obat->cs.ts == tr->tid || ((obat->cs.ts < TRANSACTION_ID_BASE || tr_version_of_parent(tr, obat->cs.ts)) && !update_conflict)) /* on append there are no conflicts */
2070 2429 : return obat;
2071 28 : if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) && obat->cs.ts >= TRANSACTION_ID_BASE) {
2072 : /* abort */
2073 0 : if (update_conflict)
2074 0 : *update_conflict = true;
2075 0 : return NULL;
2076 : }
2077 28 : if (!(obat = timestamp_delta(tr, ATOMIC_PTR_GET(&i->data))))
2078 : return NULL;
2079 28 : sql_delta* bat = ZNEW(sql_delta);
2080 28 : if (!bat)
2081 : return NULL;
2082 28 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2083 33 : if (dup_cs(tr, &obat->cs, &bat->cs, (oid_index(i->type))?TYPE_oid:TYPE_lng, 0) != LOG_OK) {
2084 0 : destroy_delta(bat, false);
2085 0 : return NULL;
2086 : }
2087 28 : bat->cs.ts = tr->tid;
2088 : /* only one writer else abort */
2089 28 : bat->next = obat;
2090 28 : if (!ATOMIC_PTR_CAS(&i->data, (void**)&bat->next, bat)) {
2091 0 : bat->next = NULL;
2092 0 : destroy_delta(bat, false);
2093 0 : if (update_conflict)
2094 0 : *update_conflict = true;
2095 0 : return NULL;
2096 : }
2097 : return bat;
2098 : }
2099 :
2100 : static int
2101 782 : update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, bool isbat)
2102 : {
2103 782 : int res = LOG_OK;
2104 782 : bool update_conflict = false;
2105 782 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
2106 :
2107 782 : if (isbat) {
2108 782 : BAT *t = tids;
2109 782 : if (!BATcount(t))
2110 : return LOG_OK;
2111 : }
2112 :
2113 279 : if (i == NULL)
2114 : return LOG_ERR;
2115 :
2116 279 : if ((delta = bind_idx_data(tr, i, &update_conflict)) == NULL)
2117 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
2118 :
2119 279 : assert(delta && delta->cs.ts == tr->tid);
2120 279 : if (odelta != delta)
2121 22 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
2122 :
2123 279 : odelta = delta;
2124 279 : res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, isbat);
2125 279 : assert(delta == odelta);
2126 : return res;
2127 : }
2128 :
2129 : static int
2130 150147 : delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, BAT *i, char *storage_type, bool istemp)
2131 : {
2132 150147 : BAT *b, *oi = i;
2133 150147 : int err = 0;
2134 150147 : sql_delta *bat = *batp;
2135 :
2136 150147 : assert(!offsets || BATcount(offsets) == BATcount(i));
2137 150147 : if (!BATcount(i))
2138 : return LOG_OK;
2139 150147 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(oi = BATunmask(i)))
2140 : return LOG_ERR;
2141 :
2142 150147 : lock_column(tr->store, id);
2143 150139 : if (bat->cs.st == ST_DICT) {
2144 13 : BAT *ni = dict_append_bat(tr, batp, oi);
2145 13 : bat = *batp;
2146 13 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2147 0 : bat_destroy(oi);
2148 13 : oi = ni;
2149 13 : if (!oi) {
2150 0 : unlock_column(tr->store, id);
2151 0 : return LOG_ERR;
2152 : }
2153 : }
2154 150139 : if (bat->cs.st == ST_FOR) {
2155 0 : BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
2156 0 : bat = *batp;
2157 0 : if (oi != i) /* oi will be replaced, so destroy possible unmask reference */
2158 0 : bat_destroy(oi);
2159 0 : oi = ni;
2160 0 : if (!oi) {
2161 0 : unlock_column(tr->store, id);
2162 0 : return LOG_ERR;
2163 : }
2164 : }
2165 :
2166 150139 : b = temp_descriptor(bat->cs.bid);
2167 150110 : if (b == NULL) {
2168 0 : unlock_column(tr->store, id);
2169 0 : if (oi != i)
2170 0 : bat_destroy(oi);
2171 0 : return LOG_ERR;
2172 : }
2173 150110 : if (istemp && !offsets && offset == 0 && BATcount(b) == 0 && bat->cs.ucnt == 0) {
2174 14 : bat_set_access(i, BAT_READ);
2175 14 : if (bat->cs.bid)
2176 14 : temp_destroy(bat->cs.bid);
2177 14 : i = transfer_to_systrans(i);
2178 14 : bat->cs.bid = temp_create(i);
2179 150096 : } else if (!offsets && offset == b->hseqbase+BATcount(b)) {
2180 149908 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2181 2 : err = 1;
2182 176 : } else if (!offsets) {
2183 176 : if (BATupdatepos(b, &offset, oi, true, true) != GDK_SUCCEED)
2184 2 : err = 1;
2185 12 : } else if ((BATtdense(offsets) && offsets->tseqbase == (b->hseqbase+BATcount(b)))) {
2186 0 : if (BATappend(b, oi, NULL, true) != GDK_SUCCEED)
2187 2 : err = 1;
2188 12 : } else if (BATupdate(b, offsets, oi, true) != GDK_SUCCEED) {
2189 0 : err = 1;
2190 : }
2191 150132 : bat_destroy(b);
2192 150148 : unlock_column(tr->store, id);
2193 :
2194 150138 : if (oi != i)
2195 13 : bat_destroy(oi);
2196 150138 : return (err)?LOG_ERR:LOG_OK;
2197 : }
2198 :
2199 : // Look at the offsets and find where the replacements end and the appends begin.
2200 : static BUN
2201 0 : start_of_appends(BAT *offsets, BUN bcnt)
2202 : {
2203 0 : BUN ocnt = BATcount(offsets);
2204 0 : if (ocnt == 0)
2205 : return 0;
2206 :
2207 0 : BUN highest = *(oid*)Tloc(offsets, ocnt - 1);
2208 0 : if (highest < bcnt)
2209 : // all are replacements
2210 : return ocnt;
2211 :
2212 : // reason backward to find the first append.
2213 : // Suppose offsets has 15 entries, bcnt == 100
2214 : // and the highest offset in offsets is 109.
2215 0 : BUN new_bcnt = highest + 1; // 110
2216 0 : BUN nappends = new_bcnt - bcnt; // 10
2217 0 : BUN nreplacements = ocnt - nappends; // 5
2218 :
2219 : // The first append should be to position bcnt
2220 0 : assert(bcnt == *(oid*)Tloc(offsets, nreplacements));
2221 :
2222 : return nreplacements;
2223 : }
2224 :
2225 :
2226 : static int
2227 15377765 : delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT *offsets, void *i, BUN cnt, char *storage_type, int tt)
2228 : {
2229 15377765 : void *oi = i;
2230 15377765 : BAT *b;
2231 15377765 : lock_column(tr->store, id);
2232 15377556 : sql_delta *bat = *batp;
2233 :
2234 15377556 : if (bat->cs.st == ST_DICT) {
2235 : /* possibly a new array is returned */
2236 4 : i = dict_append_val(tr, batp, i, cnt);
2237 4 : bat = *batp;
2238 4 : if (!i) {
2239 0 : unlock_column(tr->store, id);
2240 0 : return LOG_ERR;
2241 : }
2242 : }
2243 15377556 : if (bat->cs.st == ST_FOR) {
2244 : /* possibly a new array is returned */
2245 1 : i = for_append_val(&bat->cs, i, cnt, storage_type, tt);
2246 1 : bat = *batp;
2247 1 : if (!i) {
2248 0 : unlock_column(tr->store, id);
2249 0 : return LOG_ERR;
2250 : }
2251 : }
2252 :
2253 15377556 : b = temp_descriptor(bat->cs.bid);
2254 15377306 : if (b == NULL) {
2255 0 : if (i != oi)
2256 0 : GDKfree(i);
2257 0 : unlock_column(tr->store, id);
2258 0 : return LOG_ERR;
2259 : }
2260 15377306 : BUN bcnt = BATcount(b);
2261 :
2262 15377306 : if (offsets) {
2263 : // The first few might be replacements while later items might be appends.
2264 : // Handle the replacements here while leaving the appends to the code below.
2265 0 : BUN nreplacements = start_of_appends(offsets, bcnt);
2266 :
2267 0 : oid *start = Tloc(offsets, 0);
2268 0 : if (BUNreplacemulti(b, start, i, nreplacements, true) != GDK_SUCCEED) {
2269 0 : bat_destroy(b);
2270 0 : if (i != oi)
2271 0 : GDKfree(i);
2272 0 : unlock_column(tr->store, id);
2273 0 : return LOG_ERR;
2274 : }
2275 :
2276 : // Replacements have been handled. The rest are appends.
2277 0 : assert(offset == oid_nil);
2278 0 : offset = bcnt;
2279 0 : cnt -= nreplacements;
2280 : }
2281 :
2282 15377306 : if (bcnt > offset){
2283 559082 : size_t ccnt = ((offset+cnt) > bcnt)? (bcnt - offset):cnt;
2284 559082 : if (BUNreplacemultiincr(b, offset, i, ccnt, true) != GDK_SUCCEED) {
2285 0 : bat_destroy(b);
2286 0 : if (i != oi)
2287 0 : GDKfree(i);
2288 0 : unlock_column(tr->store, id);
2289 0 : return LOG_ERR;
2290 : }
2291 559290 : cnt -= ccnt;
2292 559290 : offset += ccnt;
2293 : }
2294 15377514 : if (cnt) {
2295 14818226 : if (BATcount(b) < offset) { /* add space */
2296 10304 : BUN d = offset - BATcount(b);
2297 10304 : if (BUNappendmulti(b, NULL, d, true) != GDK_SUCCEED) {
2298 0 : bat_destroy(b);
2299 0 : if (i != oi)
2300 0 : GDKfree(i);
2301 0 : unlock_column(tr->store, id);
2302 0 : return LOG_ERR;
2303 : }
2304 : }
2305 14818226 : if (BUNappendmulti(b, i, cnt, true) != GDK_SUCCEED) {
2306 0 : bat_destroy(b);
2307 0 : if (i != oi)
2308 0 : GDKfree(i);
2309 0 : unlock_column(tr->store, id);
2310 0 : return LOG_ERR;
2311 : }
2312 : }
2313 15377711 : bat_destroy(b);
2314 15377581 : if (i != oi)
2315 4 : GDKfree(i);
2316 15377581 : unlock_column(tr->store, id);
2317 15377581 : return LOG_OK;
2318 : }
2319 :
2320 : static int
2321 25983 : dup_storage( sql_trans *tr, storage *obat, storage *bat)
2322 : {
2323 25983 : if (!(bat->segs = new_segments(tr, 0)))
2324 : return LOG_ERR;
2325 25983 : return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
2326 : }
2327 :
2328 : static int
2329 15527931 : append_col_execute(sql_trans *tr, sql_delta **delta, sqlid id, BUN offset, BAT *offsets, void *incoming_data, BUN cnt, bool isbat, int tt, char *storage_type, bool isnew)
2330 : {
2331 15527931 : int ok = LOG_OK;
2332 :
2333 15527931 : if ((*delta)->cs.merged)
2334 36521 : (*delta)->cs.merged = false; /* TODO needs to move */
2335 15527931 : if (isbat) {
2336 150150 : BAT *bat = incoming_data;
2337 :
2338 150150 : if (BATcount(bat))
2339 150148 : ok = delta_append_bat(tr, delta, id, offset, offsets, bat, storage_type, isnew);
2340 : } else {
2341 15377781 : ok = delta_append_val(tr, delta, id, offset, offsets, incoming_data, cnt, storage_type, tt);
2342 : }
2343 15527725 : return ok;
2344 : }
2345 :
2346 : static int
2347 15526520 : append_col(sql_trans *tr, sql_column *c, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2348 : {
2349 15526520 : int res = LOG_OK;
2350 15526520 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
2351 :
2352 15526520 : if (isbat) {
2353 149944 : BAT *t = data;
2354 149944 : if (!BATcount(t))
2355 : return LOG_OK;
2356 : }
2357 :
2358 15525725 : if ((delta = bind_col_data(tr, c, NULL)) == NULL)
2359 : return LOG_ERR;
2360 :
2361 15525738 : assert(delta->cs.st == ST_DEFAULT || delta->cs.st == ST_DICT || delta->cs.st == ST_FOR);
2362 :
2363 15525738 : odelta = delta;
2364 15525738 : if ((res = append_col_execute(tr, &delta, c->base.id, offset, offsets, data, cnt, isbat, tpe, c->storage_type, isTempTable(c->t))) != LOG_OK)
2365 : return res;
2366 15525539 : if (odelta != delta) {
2367 0 : delta->next = odelta;
2368 0 : if (!ATOMIC_PTR_CAS(&c->data, (void**)&delta->next, delta)) {
2369 0 : delta->next = NULL;
2370 0 : destroy_delta(delta, false);
2371 0 : return LOG_CONFLICT;
2372 : }
2373 : }
2374 15525539 : if (delta->cs.st == ST_DEFAULT && c->storage_type)
2375 1 : res = sql_trans_alter_storage(tr, c, NULL);
2376 : return res;
2377 : }
2378 :
2379 : static int
2380 2184 : append_idx(sql_trans *tr, sql_idx *i, BUN offset, BAT *offsets, void *data, BUN cnt, bool isbat, int tpe)
2381 : {
2382 2184 : int res = LOG_OK;
2383 2184 : sql_delta *delta;
2384 :
2385 2184 : if (isbat) {
2386 1010 : BAT *t = data;
2387 1010 : if (!BATcount(t))
2388 : return LOG_OK;
2389 : }
2390 :
2391 2172 : if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
2392 : return LOG_ERR;
2393 :
2394 2172 : assert(delta->cs.st == ST_DEFAULT);
2395 :
2396 2172 : res = append_col_execute(tr, &delta, i->base.id, offset, offsets, data, cnt, isbat, tpe, NULL, isTempTable(i->t));
2397 2172 : return res;
2398 : }
2399 :
2400 : static int
2401 74772 : deletes_conflict_updates(sql_trans *tr, sql_table *t, oid rid, size_t cnt)
2402 : {
2403 74772 : int err = 0;
2404 :
2405 : /* TODO check for conflicting updates */
2406 74772 : (void)rid;
2407 74772 : (void)cnt;
2408 555162 : for(node *n = ol_first_node(t->columns); n && !err; n = n->next) {
2409 480390 : sql_column *c = n->data;
2410 480390 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
2411 :
2412 : /* check for active updates */
2413 480390 : if (!VALID_4_READ(d->cs.ts, tr) && d->cs.ucnt)
2414 : return 1;
2415 : }
2416 : return 0;
2417 : }
2418 :
2419 : static int
2420 71843 : storage_delete_val(sql_trans *tr, sql_table *t, storage *s, oid rid)
2421 : {
2422 71843 : int in_transaction = segments_in_transaction(tr, t);
2423 :
2424 71843 : lock_table(tr->store, t->base.id);
2425 : /* find segment of rid, split, mark new segment deleted (for tr->tid) */
2426 71843 : segment *seg = s->segs->h, *p = NULL;
2427 51777800 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2428 51777800 : if (seg->start <= rid && seg->end > rid) {
2429 71843 : if (!SEG_VALID_4_DELETE(seg,tr)) {
2430 4 : unlock_table(tr->store, t->base.id);
2431 4 : return LOG_CONFLICT;
2432 : }
2433 71839 : if (deletes_conflict_updates( tr, t, rid, 1)) {
2434 0 : unlock_table(tr->store, t->base.id);
2435 0 : return LOG_CONFLICT;
2436 : }
2437 71839 : if (!split_segment(s->segs, seg, p, tr, rid, 1, true)) {
2438 0 : unlock_table(tr->store, t->base.id);
2439 0 : return LOG_ERR;
2440 : }
2441 : break;
2442 : }
2443 : }
2444 71839 : unlock_table(tr->store, t->base.id);
2445 71839 : if (!in_transaction)
2446 12604 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2447 : return LOG_OK;
2448 : }
2449 :
2450 : static int
2451 2931 : seg_delete_range(sql_trans *tr, sql_table *t, storage *s, segment **Seg, size_t start, size_t cnt)
2452 : {
2453 2931 : segment *seg = *Seg, *p = NULL;
2454 9672 : for (; seg; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
2455 9670 : if (seg->start <= start && seg->end > start) {
2456 2981 : size_t lcnt = cnt;
2457 2981 : if (start+lcnt > seg->end)
2458 54 : lcnt = seg->end-start;
2459 2981 : if (SEG_IS_DELETED(seg, tr)) {
2460 47 : start += lcnt;
2461 47 : cnt -= lcnt;
2462 47 : continue;
2463 2934 : } else if (!SEG_VALID_4_DELETE(seg, tr))
2464 1 : return LOG_CONFLICT;
2465 2933 : if (deletes_conflict_updates( tr, t, start, lcnt))
2466 : return LOG_CONFLICT;
2467 2933 : *Seg = seg = split_segment(s->segs, seg, p, tr, start, lcnt, true);
2468 2933 : if (!seg)
2469 : return LOG_ERR;
2470 2933 : start += lcnt;
2471 2933 : cnt -= lcnt;
2472 : }
2473 9622 : if (start+cnt <= seg->end)
2474 : break;
2475 : }
2476 : return LOG_OK;
2477 : }
2478 :
2479 : static int
2480 566 : delete_range(sql_trans *tr, sql_table *t, storage *s, size_t start, size_t cnt)
2481 : {
2482 566 : segment *seg = s->segs->h;
2483 566 : return seg_delete_range(tr, t, s, &seg, start, cnt);
2484 : }
2485 :
2486 : static int
2487 303 : storage_delete_bat(sql_trans *tr, sql_table *t, storage *s, BAT *i)
2488 : {
2489 303 : int in_transaction = segments_in_transaction(tr, t);
2490 303 : BAT *oi = i; /* update ids */
2491 303 : int ok = LOG_OK;
2492 :
2493 303 : if ((i->ttype == TYPE_msk || mask_cand(i)) && !(i = BATunmask(i)))
2494 : return LOG_ERR;
2495 303 : if (BATcount(i)) {
2496 534 : if (BATtdense(i)) {
2497 231 : size_t start = i->tseqbase;
2498 231 : size_t cnt = BATcount(i);
2499 :
2500 231 : lock_table(tr->store, t->base.id);
2501 231 : ok = delete_range(tr, t, s, start, cnt);
2502 231 : unlock_table(tr->store, t->base.id);
2503 72 : } else if (complex_cand(i)) {
2504 0 : struct canditer ci;
2505 0 : oid f = 0, l = 0, cur = 0;
2506 :
2507 0 : canditer_init(&ci, NULL, i);
2508 0 : cur = f = canditer_next(&ci);
2509 :
2510 0 : lock_table(tr->store, t->base.id);
2511 0 : if (!is_oid_nil(f)) {
2512 0 : segment *seg = s->segs->h;
2513 0 : for(l = canditer_next(&ci); !is_oid_nil(l) && ok == LOG_OK; l = canditer_next(&ci)) {
2514 0 : if (cur+1 == l) {
2515 0 : cur++;
2516 0 : continue;
2517 : }
2518 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2519 0 : f = cur = l;
2520 : }
2521 0 : if (ok == LOG_OK)
2522 0 : ok = seg_delete_range(tr, t, s, &seg, f, cur-f);
2523 : }
2524 0 : unlock_table(tr->store, t->base.id);
2525 : } else {
2526 72 : if (!i->tsorted) {
2527 0 : assert(oi == i);
2528 0 : BAT *ni = NULL;
2529 0 : if (BATsort(&ni, NULL, NULL, i, NULL, NULL, false, false, false) != GDK_SUCCEED)
2530 0 : ok = LOG_ERR;
2531 0 : if (ni)
2532 0 : i = ni;
2533 : }
2534 72 : assert(i->tsorted);
2535 72 : BUN icnt = BATcount(i);
2536 72 : BATiter ii = bat_iterator(i);
2537 72 : oid *o = ii.base, n = o[0]+1;
2538 72 : size_t lcnt = 1;
2539 :
2540 72 : lock_table(tr->store, t->base.id);
2541 72 : segment *seg = s->segs->h;
2542 24025 : for (size_t i=1; i<icnt && ok == LOG_OK; i++) {
2543 23953 : if (o[i] == n) {
2544 23610 : lcnt++;
2545 23610 : n++;
2546 : } else {
2547 343 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2548 343 : lcnt = 0;
2549 : }
2550 23953 : if (!lcnt) {
2551 343 : n = o[i]+1;
2552 343 : lcnt = 1;
2553 : }
2554 : }
2555 72 : bat_iterator_end(&ii);
2556 72 : if (lcnt && ok == LOG_OK)
2557 72 : ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt);
2558 72 : unlock_table(tr->store, t->base.id);
2559 : }
2560 : }
2561 303 : if (i != oi)
2562 25 : bat_destroy(i);
2563 : // assert
2564 303 : if (!in_transaction)
2565 271 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
2566 : return ok;
2567 : }
2568 :
2569 : static void
2570 51321 : destroy_segments(segments *s)
2571 : {
2572 51321 : if (!s || sql_ref_dec(&s->r) > 0)
2573 0 : return;
2574 51321 : segment *seg = s->h;
2575 110994 : while(seg) {
2576 59673 : segment *n = ATOMIC_PTR_GET(&seg->next);
2577 59673 : ATOMIC_PTR_DESTROY(&seg->next);
2578 59673 : _DELETE(seg);
2579 59673 : seg = n;
2580 : }
2581 51321 : _DELETE(s);
2582 : }
2583 :
2584 : static void
2585 52711 : destroy_storage(storage *bat)
2586 : {
2587 52711 : if (ATOMIC_DEC(&bat->cs.refcnt) > 0)
2588 : return;
2589 51180 : if (bat->next)
2590 14396 : destroy_storage(bat->next);
2591 51180 : destroy_segments(bat->segs);
2592 51180 : if (bat->cs.uibid)
2593 30575 : temp_destroy(bat->cs.uibid);
2594 51180 : if (bat->cs.uvbid)
2595 30575 : temp_destroy(bat->cs.uvbid);
2596 51180 : if (bat->cs.bid)
2597 51180 : temp_destroy(bat->cs.bid);
2598 51180 : bat->cs.bid = bat->cs.uibid = bat->cs.uvbid = 0;
2599 51180 : _DELETE(bat);
2600 : }
2601 :
2602 : static int
2603 162112 : segments_conflict(sql_trans *tr, segments *segs, int uncommitted)
2604 : {
2605 162112 : if (uncommitted) {
2606 433727 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2607 286686 : if (!VALID_4_READ(s->ts,tr))
2608 : return 1;
2609 : } else {
2610 122720 : for (segment *s = segs->h; s; s = ATOMIC_PTR_GET(&s->next))
2611 108847 : if (s->ts < TRANSACTION_ID_BASE && !VALID_4_READ(s->ts,tr))
2612 : return 1;
2613 : }
2614 :
2615 : return 0;
2616 : }
2617 :
2618 : static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
2619 :
2620 : storage *
2621 2181837 : bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
2622 : {
2623 2181837 : storage *obat;
2624 :
2625 2181837 : obat = ATOMIC_PTR_GET(&t->data);
2626 :
2627 2181837 : if (obat->cs.ts != tr->tid)
2628 1505767 : if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
2629 1505712 : if (obat->cs.ts >= TRANSACTION_ID_BASE) {
2630 : /* abort */
2631 15490 : if (clear)
2632 15490 : *clear = true;
2633 15490 : return NULL;
2634 : }
2635 :
2636 2166347 : if (!clear)
2637 : return obat;
2638 :
2639 : /* remainder is only to handle clear */
2640 26283 : if (segments_conflict(tr, obat->segs, 1)) {
2641 300 : *clear = true;
2642 300 : return NULL;
2643 : }
2644 25983 : if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
2645 : return NULL;
2646 25983 : storage *bat = ZNEW(storage);
2647 25983 : if (!bat)
2648 : return NULL;
2649 25983 : ATOMIC_INIT(&bat->cs.refcnt, 1);
2650 25983 : if (dup_storage(tr, obat, bat) != LOG_OK) {
2651 0 : destroy_storage(bat);
2652 0 : return NULL;
2653 : }
2654 25983 : bat->cs.cleared = true;
2655 25983 : bat->cs.ts = tr->tid;
2656 : /* only one writer else abort */
2657 25983 : bat->next = obat;
2658 25983 : if (!ATOMIC_PTR_CAS(&t->data, (void**)&bat->next, bat)) {
2659 6 : bat->next = NULL;
2660 6 : destroy_storage(bat);
2661 6 : if (clear)
2662 6 : *clear = true;
2663 6 : return NULL;
2664 : }
2665 : return bat;
2666 : }
2667 :
2668 : static int
2669 72195 : delete_tab(sql_trans *tr, sql_table * t, void *ib, bool isbat)
2670 : {
2671 72195 : int ok = LOG_OK;
2672 72195 : BAT *b = ib;
2673 72195 : storage *bat;
2674 :
2675 72195 : if (isbat && !BATcount(b))
2676 : return ok;
2677 :
2678 72146 : if (t == NULL)
2679 : return LOG_ERR;
2680 :
2681 72146 : if ((bat = bind_del_data(tr, t, NULL)) == NULL)
2682 : return LOG_ERR;
2683 :
2684 72146 : if (isbat)
2685 303 : ok = storage_delete_bat(tr, t, bat, ib);
2686 : else
2687 71843 : ok = storage_delete_val(tr, t, bat, *(oid*)ib);
2688 : return ok;
2689 : }
2690 :
2691 : static size_t
2692 0 : dcount_col(sql_trans *tr, sql_column *c)
2693 : {
2694 0 : sql_delta *b;
2695 :
2696 0 : if (!isTable(c->t))
2697 : return 0;
2698 0 : b = col_timestamp_delta(tr, c);
2699 0 : if (!b)
2700 : return 1;
2701 :
2702 0 : storage *s = ATOMIC_PTR_GET(&c->t->data);
2703 0 : if (!s || !s->segs->t)
2704 : return 1;
2705 0 : size_t cnt = s->segs->t->end;
2706 0 : if (cnt) {
2707 0 : BAT *v = cs_bind_bat( &b->cs, QUICK, cnt);
2708 0 : size_t dcnt = 0;
2709 :
2710 0 : if (v)
2711 0 : dcnt = BATguess_uniques(v, NULL);
2712 0 : return dcnt;
2713 : }
2714 : return cnt;
2715 : }
2716 :
2717 : static BAT *
2718 3749839 : bind_no_view(BAT *b, bool quick)
2719 : {
2720 3749839 : if (isVIEW(b)) { /* If it is a view get the parent BAT */
2721 3747910 : BAT *nb = BBP_desc(VIEWtparent(b));
2722 3747910 : bat_destroy(b);
2723 3747908 : if (!(b = quick ? quick_descriptor(nb->batCacheid) : temp_descriptor(nb->batCacheid)))
2724 : return NULL;
2725 : }
2726 : return b;
2727 : }
2728 :
2729 : static int
2730 0 : set_stats_col(sql_trans *tr, sql_column *c, double *unique_est, char *min, char *max)
2731 : {
2732 0 : int ok = 0;
2733 0 : assert(tr->active);
2734 0 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2735 0 : return 0;
2736 0 : lock_column(tr->store, c->base.id);
2737 0 : if (unique_est) {
2738 0 : sql_delta *d;
2739 0 : if ((d = ATOMIC_PTR_GET(&c->data)) && d->cs.st == ST_DEFAULT) {
2740 0 : BAT *b;
2741 0 : if ((b = bind_col(tr, c, RDONLY)) && (b = bind_no_view(b, false))) {
2742 0 : MT_lock_set(&b->theaplock);
2743 0 : b->tunique_est = *unique_est;
2744 0 : MT_lock_unset(&b->theaplock);
2745 0 : bat_destroy(b);
2746 : }
2747 : }
2748 : }
2749 0 : if (min) {
2750 0 : _DELETE(c->min);
2751 0 : size_t minlen = ATOMlen(c->type.type->localtype, min);
2752 0 : if ((c->min = GDKmalloc(minlen)) != NULL) {
2753 0 : memcpy(c->min, min, minlen);
2754 0 : ok = 1;
2755 : }
2756 : }
2757 0 : if (max) {
2758 0 : _DELETE(c->max);
2759 0 : size_t maxlen = ATOMlen(c->type.type->localtype, max);
2760 0 : if ((c->max = GDKmalloc(maxlen)) != NULL) {
2761 0 : memcpy(c->max, max, maxlen);
2762 0 : ok = 1;
2763 : }
2764 : }
2765 0 : unlock_column(tr->store, c->base.id);
2766 0 : return ok;
2767 : }
2768 :
2769 : static int
2770 26 : min_max_col(sql_trans *tr, sql_column *c)
2771 : {
2772 26 : int ok = 0;
2773 26 : BAT *b = NULL;
2774 26 : sql_delta *d = NULL;
2775 :
2776 26 : assert(tr->active);
2777 26 : if (!c || !ATOMIC_PTR_GET(&c->data) || !isTable(c->t) || !c->t->s)
2778 0 : return 0;
2779 26 : if (c->min && c->max)
2780 : return 1;
2781 26 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2782 26 : if (d->cs.st == ST_FOR)
2783 : return 0;
2784 26 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2785 26 : lock_column(tr->store, c->base.id);
2786 26 : if (c->min && c->max) {
2787 0 : unlock_column(tr->store, c->base.id);
2788 0 : return 1;
2789 : }
2790 26 : _DELETE(c->min);
2791 26 : _DELETE(c->max);
2792 26 : if ((b = bind_col(tr, c, access))) {
2793 26 : if (!(b = bind_no_view(b, false))) {
2794 0 : unlock_column(tr->store, c->base.id);
2795 0 : return 0;
2796 : }
2797 26 : BATiter bi = bat_iterator(b);
2798 26 : if (bi.minpos != BUN_NONE && bi.maxpos != BUN_NONE) {
2799 17 : const void *nmin = BUNtail(bi, bi.minpos), *nmax = BUNtail(bi, bi.maxpos);
2800 17 : size_t minlen = ATOMlen(bi.type, nmin), maxlen = ATOMlen(bi.type, nmax);
2801 :
2802 17 : if (!(c->min = GDKmalloc(minlen)) || !(c->max = GDKmalloc(maxlen))) {
2803 0 : _DELETE(c->min);
2804 0 : _DELETE(c->max);
2805 : } else {
2806 17 : memcpy(c->min, nmin, minlen);
2807 17 : memcpy(c->max, nmax, maxlen);
2808 17 : ok = 1;
2809 : }
2810 : }
2811 26 : bat_iterator_end(&bi);
2812 26 : bat_destroy(b);
2813 : }
2814 26 : unlock_column(tr->store, c->base.id);
2815 : }
2816 : return ok;
2817 : }
2818 :
2819 : static size_t
2820 17 : count_segs(segment *s)
2821 : {
2822 17 : size_t nr = 0;
2823 :
2824 72 : for( ; s; s = ATOMIC_PTR_GET(&s->next))
2825 55 : nr++;
2826 17 : return nr;
2827 : }
2828 :
2829 : static size_t
2830 34 : count_del(sql_trans *tr, sql_table *t, int access)
2831 : {
2832 34 : storage *d;
2833 :
2834 34 : if (!isTable(t))
2835 : return 0;
2836 34 : d = tab_timestamp_storage(tr, t);
2837 34 : if (!d)
2838 : return 0;
2839 34 : if (access == 2)
2840 0 : return d->cs.ucnt;
2841 34 : if (access == 1)
2842 0 : return count_inserts(d->segs->h, tr);
2843 34 : if (access == 10) /* special case for counting the number of segments */
2844 17 : return count_segs(d->segs->h);
2845 17 : return count_deletes(d->segs->h, tr);
2846 : }
2847 :
2848 : static int
2849 27002 : sorted_col(sql_trans *tr, sql_column *col)
2850 : {
2851 27002 : int sorted = 0;
2852 :
2853 27002 : assert(tr->active);
2854 27002 : if (!isTable(col->t) || !col->t->s)
2855 : return 0;
2856 :
2857 27002 : if (col && ATOMIC_PTR_GET(&col->data) && !col->storage_type /* no order on dict compressed tables */) {
2858 26983 : BAT *b = bind_col(tr, col, QUICK);
2859 :
2860 26983 : if (b)
2861 26983 : sorted = b->tsorted || b->trevsorted;
2862 : }
2863 : return sorted;
2864 : }
2865 :
2866 : static int
2867 7547 : unique_col(sql_trans *tr, sql_column *col)
2868 : {
2869 7547 : int distinct = 0;
2870 :
2871 7547 : assert(tr->active);
2872 7547 : if (!isTable(col->t) || !col->t->s)
2873 : return 0;
2874 :
2875 7547 : if (col && ATOMIC_PTR_GET(&col->data)) {
2876 7547 : BAT *b = bind_col(tr, col, QUICK);
2877 :
2878 7547 : if (b)
2879 7547 : distinct = b->tkey;
2880 : }
2881 : return distinct;
2882 : }
2883 :
2884 : static int
2885 1920 : double_elim_col(sql_trans *tr, sql_column *col)
2886 : {
2887 1920 : int de = 0;
2888 1920 : sql_delta *d;
2889 :
2890 1920 : assert(tr->active);
2891 1920 : if (!isTable(col->t) || !col->t->s)
2892 : return 0;
2893 :
2894 1920 : if (col && (d=ATOMIC_PTR_GET(&col->data))!=NULL && col->storage_type) {
2895 6 : if (d->cs.st == ST_DICT) {
2896 6 : BAT *b = bind_col(tr, col, QUICK);
2897 6 : if (b && b->ttype == TYPE_bte)
2898 : de = 1;
2899 0 : else if (b && b->ttype == TYPE_sht)
2900 1920 : de = 2;
2901 : }
2902 1914 : } else if (col && ATOMstorage(col->type.type->localtype) == TYPE_str && ATOMIC_PTR_GET(&col->data)) {
2903 1914 : BAT *b = bind_col(tr, col, QUICK);
2904 :
2905 1914 : if (b && ATOMstorage(b->ttype) == TYPE_str) { /* check double elimination */
2906 1914 : de = GDK_ELIMDOUBLES(b->tvheap);
2907 1914 : if (de)
2908 1688 : de = (int) ceil(b->tvheap->free / (double) GDK_VAROFFSET);
2909 : }
2910 1688 : assert(de >= 0 && de <= 16);
2911 : }
2912 : return de;
2913 : }
2914 :
2915 : static int
2916 3787802 : col_stats(sql_trans *tr, sql_column *c, bool *nonil, bool *unique, double *unique_est, ValPtr min, ValPtr max)
2917 : {
2918 3787802 : int ok = 0;
2919 3787802 : BAT *b = NULL, *off = NULL, *upv = NULL;
2920 3787802 : sql_delta *d = NULL;
2921 :
2922 3787802 : (void) tr;
2923 3787802 : assert(tr->active);
2924 3787802 : *nonil = false;
2925 3787802 : *unique = false;
2926 3787802 : *unique_est = 0.0;
2927 3787802 : if (!c || !isTable(c->t) || !c->t->s)
2928 : return ok;
2929 :
2930 3787583 : if ((d = ATOMIC_PTR_GET(&c->data))) {
2931 3748568 : if (d->cs.st == ST_FOR) {
2932 30 : *nonil = true; /* TODO for min/max. I will do it later */
2933 30 : return ok;
2934 : }
2935 3748538 : int eclass = c->type.type->eclass;
2936 3748538 : int access = d->cs.st == ST_DICT ? RD_EXT : RDONLY;
2937 3748538 : if ((b = bind_col(tr, c, access))) {
2938 3748590 : if (!(b = bind_no_view(b, false)))
2939 0 : return ok;
2940 3748520 : BATiter bi = bat_iterator(b);
2941 3748527 : *nonil = bi.nonil && !bi.nil;
2942 :
2943 3748527 : if ((EC_NUMBER(eclass) || EC_VARCHAR(eclass) || EC_TEMP_NOFRAC(eclass) || eclass == EC_DATE) &&
2944 3451060 : d->cs.ucnt == 0 && (bi.minpos != BUN_NONE || bi.maxpos != BUN_NONE)) {
2945 2273262 : if (c->min && VALinit(min, bi.type, c->min))
2946 : ok |= 1;
2947 2273208 : else if (bi.minpos != BUN_NONE && VALinit(min, bi.type, BUNtail(bi, bi.minpos)))
2948 2264131 : ok |= 1;
2949 2273221 : if (c->max && VALinit(max, bi.type, c->max))
2950 54 : ok |= 2;
2951 2273167 : else if (bi.maxpos != BUN_NONE && VALinit(max, bi.type, BUNtail(bi, bi.maxpos)))
2952 2257989 : ok |= 2;
2953 : }
2954 3748477 : if (d->cs.ucnt == 0) {
2955 3745724 : if (d->cs.st == ST_DEFAULT) {
2956 3745019 : *unique = bi.key;
2957 3745019 : *unique_est = bi.unique_est;
2958 3745019 : if (*unique_est == 0)
2959 1194935 : *unique_est = (double)BATguess_uniques(b,NULL);
2960 705 : } else if (d->cs.st == ST_DICT && (off = bind_col(tr, c, QUICK)) && (off = bind_no_view(off, true))) {
2961 : /* for dict, check the offsets bat for uniqueness */
2962 705 : MT_lock_set(&off->theaplock);
2963 705 : *unique = off->tkey;
2964 705 : *unique_est = off->tunique_est;
2965 705 : MT_lock_unset(&off->theaplock);
2966 : }
2967 : }
2968 3748482 : bat_iterator_end(&bi);
2969 3748544 : bat_destroy(b);
2970 3748534 : if (*nonil && d->cs.ucnt > 0) {
2971 : /* This could use a quick descriptor */
2972 519 : if (!(upv = bind_col(tr, c, RD_UPD_VAL)) || !(upv = bind_no_view(upv, false))) {
2973 0 : *nonil = false;
2974 : } else {
2975 519 : MT_lock_set(&upv->theaplock);
2976 519 : *nonil &= upv->tnonil && !upv->tnil;
2977 519 : MT_lock_unset(&upv->theaplock);
2978 519 : bat_destroy(upv);
2979 : }
2980 : }
2981 : }
2982 : }
2983 : return ok;
2984 : }
2985 :
2986 : static int
2987 257 : col_set_range(sql_trans *tr, sql_column *col, sql_part *pt, bool add_range)
2988 : {
2989 257 : assert(tr->active);
2990 257 : if (!isTable(col->t) || !col->t->s)
2991 : return LOG_OK;
2992 :
2993 252 : if (col && ATOMIC_PTR_GET(&col->data)) {
2994 252 : BAT *b = bind_col(tr, col, QUICK);
2995 :
2996 252 : if (b) { /* add props for ranges [min, max> */
2997 252 : MT_lock_set(&b->theaplock);
2998 252 : if (add_range) {
2999 179 : BATsetprop_nolock(b, GDK_MIN_BOUND, b->ttype, pt->part.range.minvalue);
3000 179 : if (ATOMcmp(b->ttype, pt->part.range.maxvalue, ATOMnilptr(b->ttype)) != 0)
3001 103 : BATsetprop_nolock(b, GDK_MAX_BOUND, b->ttype, pt->part.range.maxvalue);
3002 : else
3003 76 : BATrmprop_nolock(b, GDK_MAX_BOUND);
3004 179 : if (!pt->with_nills || !col->null)
3005 117 : BATsetprop_nolock(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3006 : } else {
3007 73 : BATrmprop_nolock(b, GDK_MIN_BOUND);
3008 73 : BATrmprop_nolock(b, GDK_MAX_BOUND);
3009 73 : BATrmprop_nolock(b, GDK_NOT_NULL);
3010 : }
3011 252 : MT_lock_unset(&b->theaplock);
3012 : }
3013 : }
3014 : return LOG_OK;
3015 : }
3016 :
3017 : static int
3018 4158 : col_not_null(sql_trans *tr, sql_column *col, bool not_null)
3019 : {
3020 4158 : assert(tr->active);
3021 4158 : if (!isTable(col->t) || !col->t->s)
3022 : return LOG_OK;
3023 :
3024 4128 : if (col && ATOMIC_PTR_GET(&col->data)) {
3025 4128 : BAT *b = bind_col(tr, col, QUICK);
3026 :
3027 4128 : if (b) { /* add props for ranges [min, max> */
3028 4128 : if (not_null) {
3029 4126 : BATsetprop(b, GDK_NOT_NULL, b->ttype, ATOMnilptr(b->ttype));
3030 : } else {
3031 2 : BATrmprop(b, GDK_NOT_NULL);
3032 : }
3033 : }
3034 : }
3035 : return LOG_OK;
3036 : }
3037 :
3038 : static int
3039 29108 : load_cs(sql_trans *tr, column_storage *cs, int type, sqlid id)
3040 : {
3041 29108 : sqlstore *store = tr->store;
3042 29108 : int bid = log_find_bat(store->logger, id);
3043 29108 : if (bid <= 0)
3044 : return LOG_ERR;
3045 29108 : cs->bid = temp_dup(bid);
3046 29108 : cs->ucnt = 0;
3047 29108 : cs->uibid = e_bat(TYPE_oid);
3048 29108 : cs->uvbid = e_bat(type);
3049 29108 : if (cs->uibid == BID_NIL || cs->uvbid == BID_NIL)
3050 : return LOG_ERR;
3051 : return LOG_OK;
3052 : }
3053 :
3054 : static int
3055 67604 : log_create_delta(sql_trans *tr, sql_delta *bat, sqlid id)
3056 : {
3057 67604 : int res = LOG_OK;
3058 67604 : gdk_return ok;
3059 67604 : BAT *b = temp_descriptor(bat->cs.bid);
3060 :
3061 67604 : if (b == NULL)
3062 : return LOG_ERR;
3063 :
3064 67604 : if (!bat->cs.uibid)
3065 67596 : bat->cs.uibid = e_bat(TYPE_oid);
3066 67604 : if (!bat->cs.uvbid)
3067 67596 : bat->cs.uvbid = e_bat(b->ttype);
3068 67604 : if (bat->cs.uibid == BID_NIL || bat->cs.uvbid == BID_NIL)
3069 0 : res = LOG_ERR;
3070 67604 : if (GDKinmemory(0)) {
3071 174 : bat_destroy(b);
3072 174 : return res;
3073 : }
3074 :
3075 67430 : bat_set_access(b, BAT_READ);
3076 67430 : sqlstore *store = tr->store;
3077 67430 : ok = log_bat_persists(store->logger, b, id);
3078 67430 : bat_destroy(b);
3079 67430 : if(res != LOG_OK)
3080 : return res;
3081 67430 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3082 : }
3083 :
3084 : static int
3085 0 : new_persistent_delta( sql_delta *bat)
3086 : {
3087 0 : bat->cs.ucnt = 0;
3088 0 : return LOG_OK;
3089 : }
3090 :
3091 : static void
3092 128720 : create_delta( sql_delta *d, BAT *b)
3093 : {
3094 128720 : bat_set_access(b, BAT_READ);
3095 128720 : d->cs.bid = temp_create(b);
3096 128720 : d->cs.uibid = d->cs.uvbid = 0;
3097 128720 : d->cs.ucnt = 0;
3098 128720 : }
3099 :
3100 : static bat
3101 7280 : copyBat (bat i, int type, oid seq)
3102 : {
3103 7280 : BAT *b, *tb;
3104 7280 : bat res;
3105 :
3106 7280 : if (!i)
3107 : return i;
3108 7280 : tb = quick_descriptor(i);
3109 7280 : if (tb == NULL)
3110 : return 0;
3111 7280 : b = BATconstant(seq, type, ATOMnilptr(type), BATcount(tb), PERSISTENT);
3112 7280 : if (b == NULL)
3113 : return 0;
3114 :
3115 7280 : bat_set_access(b, BAT_READ);
3116 :
3117 7280 : res = temp_create(b);
3118 7280 : bat_destroy(b);
3119 7280 : return res;
3120 : }
3121 :
3122 : static int
3123 151035 : create_col(sql_trans *tr, sql_column *c)
3124 : {
3125 151035 : int ok = LOG_OK, new = 0;
3126 151035 : int type = c->type.type->localtype;
3127 151035 : sql_delta *bat = ATOMIC_PTR_GET(&c->data);
3128 :
3129 151035 : if (!bat) {
3130 151035 : new = 1;
3131 151035 : bat = ZNEW(sql_delta);
3132 151035 : if (!bat)
3133 : return LOG_ERR;
3134 151035 : ATOMIC_PTR_SET(&c->data, bat);
3135 151035 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3136 : }
3137 :
3138 151035 : if (new)
3139 151035 : bat->cs.ts = tr->tid;
3140 :
3141 151035 : if (!isNew(c)&& !isTempTable(c->t)){
3142 22293 : bat->cs.ts = tr->ts;
3143 22293 : ok = load_cs(tr, &bat->cs, type, c->base.id);
3144 22293 : if (ok == LOG_OK && c->storage_type) {
3145 4 : if (strcmp(c->storage_type, "DICT") == 0) {
3146 2 : sqlstore *store = tr->store;
3147 2 : int bid = log_find_bat(store->logger, -c->base.id);
3148 2 : if (bid <= 0)
3149 : return LOG_ERR;
3150 2 : bat->cs.ebid = temp_dup(bid);
3151 2 : bat->cs.st = ST_DICT;
3152 2 : } else if (strncmp(c->storage_type, "FOR", 3) == 0) {
3153 2 : bat->cs.st = ST_FOR;
3154 : }
3155 : }
3156 22293 : return ok;
3157 128742 : } else if (bat && bat->cs.bid) {
3158 0 : return new_persistent_delta(ATOMIC_PTR_GET(&c->data));
3159 : } else {
3160 128742 : sql_column *fc = NULL;
3161 128742 : size_t cnt = 0;
3162 :
3163 : /* alter ? */
3164 128742 : if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc = ol_first_node(c->t->columns)->data) != NULL) {
3165 79180 : storage *s = tab_timestamp_storage(tr, fc->t);
3166 79180 : if (s == NULL)
3167 : return LOG_ERR;
3168 79180 : cnt = segs_end(s->segs, tr, c->t);
3169 : }
3170 128742 : if (cnt && fc != c) {
3171 22 : sql_delta *d = ATOMIC_PTR_GET(&fc->data);
3172 :
3173 22 : if (d->cs.bid) {
3174 22 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3175 22 : if(bat->cs.bid == BID_NIL)
3176 22 : ok = LOG_ERR;
3177 : }
3178 22 : if (d->cs.uibid) {
3179 10 : bat->cs.uibid = e_bat(TYPE_oid);
3180 10 : if (bat->cs.uibid == BID_NIL)
3181 22 : ok = LOG_ERR;
3182 : }
3183 22 : if (d->cs.uvbid) {
3184 10 : bat->cs.uvbid = e_bat(type);
3185 10 : if(bat->cs.uvbid == BID_NIL)
3186 0 : ok = LOG_ERR;
3187 : }
3188 : } else {
3189 128720 : BAT *b = bat_new(type, c->t->sz, PERSISTENT);
3190 128720 : if (!b) {
3191 : ok = LOG_ERR;
3192 : } else {
3193 128720 : create_delta(ATOMIC_PTR_GET(&c->data), b);
3194 128720 : bat_destroy(b);
3195 : }
3196 :
3197 128720 : if (!new) {
3198 0 : bat->cs.uibid = e_bat(TYPE_oid);
3199 0 : if (bat->cs.uibid == BID_NIL)
3200 0 : ok = LOG_ERR;
3201 0 : bat->cs.uvbid = e_bat(type);
3202 0 : if(bat->cs.uvbid == BID_NIL)
3203 0 : ok = LOG_ERR;
3204 : }
3205 : }
3206 128742 : bat->cs.ucnt = 0;
3207 :
3208 128742 : if (new && !isTempTable(c->t) && !isNew(c->t) /* alter */)
3209 90 : trans_add_obj(tr, &c->base, bat, &tc_gc_col, &commit_create_col, &log_create_col);
3210 : }
3211 : return ok;
3212 : }
3213 :
3214 : static int
3215 61249 : log_create_col_(sql_trans *tr, sql_column *c)
3216 : {
3217 61249 : assert(!isTempTable(c->t));
3218 61249 : return log_create_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3219 : }
3220 :
3221 : static int
3222 86 : log_create_col(sql_trans *tr, sql_change *change)
3223 : {
3224 86 : return log_create_col_(tr, (sql_column*)change->obj);
3225 : }
3226 :
3227 : static int
3228 116111 : commit_create_delta( sql_trans *tr, sql_table *t, sql_base *base, sql_delta *delta, ulng commit_ts, ulng oldest)
3229 : {
3230 116111 : (void) t; // TODO transaction_layer_revamp: remove if unnecessary
3231 116111 : (void)oldest;
3232 116111 : assert(delta->cs.ts == tr->tid);
3233 116111 : delta->cs.ts = commit_ts;
3234 :
3235 116111 : assert(delta->next == NULL);
3236 116111 : if (!delta->cs.merged)
3237 116110 : merge_delta(delta);
3238 116111 : if (!tr->parent)
3239 116107 : base->new = 0;
3240 116111 : return LOG_OK;
3241 : }
3242 :
3243 : static int
3244 90 : commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3245 : {
3246 90 : sql_column *c = (sql_column*)change->obj;
3247 90 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3248 90 : if (!tr->parent)
3249 89 : c->base.new = 0;
3250 90 : return commit_create_delta( tr, c->t, &c->base, delta, commit_ts, oldest);
3251 : }
3252 :
3253 : /* will be called for new idx's and when new index columns are created */
3254 : static int
3255 9448 : create_idx(sql_trans *tr, sql_idx *ni)
3256 : {
3257 9448 : int ok = LOG_OK, new = 0;
3258 9448 : sql_delta *bat = ATOMIC_PTR_GET(&ni->data);
3259 9448 : int type = TYPE_lng;
3260 :
3261 9448 : if (oid_index(ni->type))
3262 955 : type = TYPE_oid;
3263 :
3264 9448 : if (!bat) {
3265 9448 : new = 1;
3266 9448 : bat = ZNEW(sql_delta);
3267 9448 : if (!bat)
3268 : return LOG_ERR;
3269 9448 : ATOMIC_PTR_INIT(&ni->data, bat);
3270 9448 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3271 : }
3272 :
3273 9448 : if (new)
3274 9448 : bat->cs.ts = tr->tid;
3275 :
3276 9448 : if (!isNew(ni) && !isTempTable(ni->t)){
3277 2190 : bat->cs.ts = 1;
3278 2190 : return load_cs(tr, &bat->cs, type, ni->base.id);
3279 7258 : } else if (bat && bat->cs.bid && !isTempTable(ni->t)) {
3280 0 : return new_persistent_delta(ATOMIC_PTR_GET(&ni->data));
3281 : } else {
3282 7258 : sql_column *c = ol_first_node(ni->t->columns)->data;
3283 7258 : sql_delta *d = col_timestamp_delta(tr, c);
3284 :
3285 7258 : if (d) {
3286 : /* Here we also handle indices created through alter stmts */
3287 : /* These need to be created aligned to the existing data */
3288 7258 : if (d->cs.bid) {
3289 7258 : bat->cs.bid = copyBat(d->cs.bid, type, 0);
3290 7258 : if(bat->cs.bid == BID_NIL)
3291 7258 : ok = LOG_ERR;
3292 : }
3293 : } else {
3294 : return LOG_ERR;
3295 : }
3296 :
3297 7258 : bat->cs.ucnt = 0;
3298 :
3299 7258 : if (!new) {
3300 0 : bat->cs.uibid = e_bat(TYPE_oid);
3301 0 : if (bat->cs.uibid == BID_NIL)
3302 0 : ok = LOG_ERR;
3303 0 : bat->cs.uvbid = e_bat(type);
3304 0 : if(bat->cs.uvbid == BID_NIL)
3305 0 : ok = LOG_ERR;
3306 : }
3307 7258 : bat->cs.ucnt = 0;
3308 7258 : if (new && !isTempTable(ni->t) && !isNew(ni->t) /* alter */)
3309 631 : trans_add_obj(tr, &ni->base, bat, &tc_gc_idx, &commit_create_idx, &log_create_idx);
3310 : }
3311 : return ok;
3312 : }
3313 :
3314 : static int
3315 6355 : log_create_idx_(sql_trans *tr, sql_idx *i)
3316 : {
3317 6355 : assert(!isTempTable(i->t));
3318 6355 : return log_create_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3319 : }
3320 :
3321 : static int
3322 617 : log_create_idx(sql_trans *tr, sql_change *change)
3323 : {
3324 617 : return log_create_idx_(tr, (sql_idx*)change->obj);
3325 : }
3326 :
3327 : static int
3328 631 : commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3329 : {
3330 631 : sql_idx *i = (sql_idx*)change->obj;
3331 631 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3332 631 : if (!tr->parent)
3333 631 : i->base.new = 0;
3334 631 : return commit_create_delta( tr, i->t, &i->base, delta, commit_ts, oldest);
3335 : return LOG_OK;
3336 : }
3337 :
3338 : static int
3339 4625 : load_storage(sql_trans *tr, sql_table *t, storage *s, sqlid id)
3340 : {
3341 4625 : int ok = load_cs(tr, &s->cs, TYPE_msk, id);
3342 4625 : BAT *b = NULL, *ib = NULL;
3343 :
3344 4625 : if (ok != LOG_OK)
3345 : return ok;
3346 4625 : if (!(b = temp_descriptor(s->cs.bid)))
3347 : return LOG_ERR;
3348 4625 : ib = b;
3349 :
3350 4625 : if ((b->ttype == TYPE_msk || mask_cand(b)) && !(b = BATunmask(b))) {
3351 0 : bat_destroy(ib);
3352 0 : return LOG_ERR;
3353 : }
3354 :
3355 4625 : if (BATcount(b)) {
3356 284 : if (ok == LOG_OK && !(s->segs = new_segments(tr, BATcount(ib)))) {
3357 0 : bat_destroy(ib);
3358 0 : return LOG_ERR;
3359 : }
3360 418 : if (BATtdense(b)) {
3361 134 : size_t start = b->tseqbase;
3362 134 : size_t cnt = BATcount(b);
3363 134 : ok = delete_range(tr, t, s, start, cnt);
3364 : } else {
3365 150 : assert(b->tsorted);
3366 150 : BUN icnt = BATcount(b);
3367 150 : BATiter bi = bat_iterator(b);
3368 150 : size_t lcnt = 1;
3369 150 : oid n;
3370 150 : segment *seg = s->segs->h;
3371 150 : if (complex_cand(b)) {
3372 0 : oid o = * (oid *) Tpos(&bi, 0);
3373 0 : n = o + 1;
3374 0 : for (BUN i = 1; i < icnt; i++) {
3375 0 : o = * (oid *) Tpos(&bi, i);
3376 0 : if (o == n) {
3377 0 : lcnt++;
3378 0 : n++;
3379 : } else {
3380 0 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3381 : break;
3382 : lcnt = 0;
3383 : }
3384 0 : if (!lcnt) {
3385 0 : n = o + 1;
3386 0 : lcnt = 1;
3387 : }
3388 : }
3389 : } else {
3390 150 : oid *o = bi.base;
3391 150 : n = o[0]+1;
3392 207947 : for (size_t i=1; i<icnt; i++) {
3393 207797 : if (o[i] == n) {
3394 205847 : lcnt++;
3395 205847 : n++;
3396 : } else {
3397 1950 : if ((ok = seg_delete_range(tr, t, s, &seg, n-lcnt, lcnt)) != LOG_OK)
3398 : break;
3399 : lcnt = 0;
3400 : }
3401 207797 : if (!lcnt) {
3402 1950 : n = o[i]+1;
3403 1950 : lcnt = 1;
3404 : }
3405 : }
3406 : }
3407 150 : if (lcnt && ok == LOG_OK)
3408 150 : ok = delete_range(tr, t, s, n-lcnt, lcnt);
3409 150 : bat_iterator_end(&bi);
3410 : }
3411 284 : if (ok == LOG_OK)
3412 4920 : for (segment *seg = s->segs->h; seg; seg = ATOMIC_PTR_GET(&seg->next))
3413 4636 : if (seg->ts == tr->tid)
3414 2405 : seg->ts = 1;
3415 : } else {
3416 4341 : if (ok == LOG_OK) {
3417 4341 : BAT *bb = quick_descriptor(s->cs.bid);
3418 :
3419 4341 : if (!bb || !(s->segs = new_segments(tr, BATcount(bb)))) {
3420 : ok = LOG_ERR;
3421 : } else {
3422 4341 : segment *seg = s->segs->h;
3423 4341 : if (seg->ts == tr->tid)
3424 4341 : seg->ts = 1;
3425 : }
3426 : }
3427 : }
3428 4625 : if (b != ib)
3429 4625 : bat_destroy(b);
3430 4625 : bat_destroy(ib);
3431 :
3432 4625 : return ok;
3433 : }
3434 :
3435 : static int
3436 25236 : create_del(sql_trans *tr, sql_table *t)
3437 : {
3438 25236 : int ok = LOG_OK, new = 0;
3439 25236 : BAT *b;
3440 25236 : storage *bat = ATOMIC_PTR_GET(&t->data);
3441 :
3442 25236 : if (!bat) {
3443 25236 : new = 1;
3444 25236 : bat = ZNEW(storage);
3445 25236 : if(!bat)
3446 : return LOG_ERR;
3447 25236 : ATOMIC_PTR_INIT(&t->data, bat);
3448 25236 : ATOMIC_INIT(&bat->cs.refcnt, 1);
3449 25236 : bat->cs.ts = tr->tid;
3450 : }
3451 :
3452 25236 : if (!isNew(t) && !isTempTable(t)) {
3453 4625 : bat->cs.ts = tr->ts;
3454 4625 : return load_storage(tr, t, bat, t->base.id);
3455 20611 : } else if (bat->cs.bid) {
3456 : return ok;
3457 : } else {
3458 20611 : assert(!bat->segs);
3459 20611 : if (!(bat->segs = new_segments(tr, 0)))
3460 : return LOG_ERR;
3461 :
3462 20611 : b = bat_new(TYPE_msk, t->sz, PERSISTENT);
3463 20611 : if(b != NULL) {
3464 20611 : bat_set_access(b, BAT_READ);
3465 20611 : bat->cs.bid = temp_create(b);
3466 20611 : bat_destroy(b);
3467 : } else {
3468 : return LOG_ERR;
3469 : }
3470 20611 : if (new)
3471 27168 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_create_del, isTempTable(t) ? NULL : &log_create_del);
3472 : }
3473 : return ok;
3474 : }
3475 :
3476 : static int
3477 193683 : log_segment(sql_trans *tr, segment *s, sqlid id)
3478 : {
3479 193683 : sqlstore *store = tr->store;
3480 193683 : msk m = s->deleted;
3481 193683 : return log_constant(store->logger, TYPE_msk, &m, id, s->start, s->end-s->start)==GDK_SUCCEED?LOG_OK:LOG_ERR;
3482 : }
3483 :
3484 : static int
3485 96129 : log_segments(sql_trans *tr, segments *segs, sqlid id)
3486 : {
3487 : /* log segments */
3488 96129 : lock_table(tr->store, id);
3489 493141 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
3490 397012 : unlock_table(tr->store, id);
3491 397012 : if (seg->ts == tr->tid && seg->end-seg->start) {
3492 146961 : if (log_segment(tr, seg, id) != LOG_OK) {
3493 0 : unlock_table(tr->store, id);
3494 0 : return LOG_ERR;
3495 : }
3496 : }
3497 397012 : lock_table(tr->store, id);
3498 : }
3499 96129 : unlock_table(tr->store, id);
3500 96129 : return LOG_OK;
3501 : }
3502 :
3503 : static int
3504 12291 : log_create_storage(sql_trans *tr, storage *bat, sql_table *t)
3505 : {
3506 12291 : BAT *b;
3507 12291 : int ok = LOG_OK;
3508 :
3509 12291 : if (GDKinmemory(0))
3510 : return LOG_OK;
3511 :
3512 12259 : b = temp_descriptor(bat->cs.bid);
3513 12259 : if (b == NULL)
3514 : return LOG_ERR;
3515 :
3516 12259 : sqlstore *store = tr->store;
3517 12259 : bat_set_access(b, BAT_READ);
3518 12259 : if (ok == LOG_OK)
3519 12259 : ok = (log_bat_persists(store->logger, b, t->base.id) == GDK_SUCCEED)?LOG_OK:LOG_ERR;
3520 12259 : if (ok == LOG_OK)
3521 12259 : ok = log_segments(tr, bat->segs, t->base.id);
3522 12259 : bat_destroy(b);
3523 12259 : return ok;
3524 : }
3525 :
3526 : static int
3527 12305 : log_create_del(sql_trans *tr, sql_change *change)
3528 : {
3529 12305 : int ok = LOG_OK;
3530 12305 : sql_table *t = (sql_table*)change->obj;
3531 :
3532 12305 : if (t->base.deleted)
3533 : return ok;
3534 12291 : assert(!isTempTable(t));
3535 12291 : ok = log_create_storage(tr, ATOMIC_PTR_GET(&t->data), t);
3536 12291 : if (ok == LOG_OK) {
3537 73454 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3538 61163 : sql_column *c = n->data;
3539 :
3540 61163 : ok = log_create_col_(tr, c);
3541 : }
3542 12291 : if (t->idxs) {
3543 18041 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3544 5750 : sql_idx *i = n->data;
3545 :
3546 5750 : if (ATOMIC_PTR_GET(&i->data))
3547 5738 : ok = log_create_idx_(tr, i);
3548 : }
3549 : }
3550 : }
3551 : return ok;
3552 : }
3553 :
3554 : static int
3555 20613 : commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3556 : {
3557 20613 : int ok = LOG_OK;
3558 20613 : sql_table *t = (sql_table*)change->obj;
3559 20613 : storage *dbat = ATOMIC_PTR_GET(&t->data);
3560 :
3561 20613 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
3562 119 : assert(isTempTable(t));
3563 119 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
3564 119 : if (commit_ts) dbat->segs->h->ts = commit_ts;
3565 119 : return ok;
3566 : }
3567 :
3568 20494 : if (!commit_ts) /* rollback handled by ? */
3569 : return ok;
3570 18640 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
3571 18640 : assert(ok == LOG_OK);
3572 18640 : if (ok != LOG_OK)
3573 : return ok;
3574 18640 : merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
3575 18640 : assert(dbat->cs.ts == tr->tid);
3576 18640 : dbat->cs.ts = commit_ts;
3577 18640 : if (ok == LOG_OK) {
3578 128279 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3579 109639 : sql_column *c = n->data;
3580 109639 : sql_delta *delta = ATOMIC_PTR_GET(&c->data);
3581 :
3582 109639 : ok = commit_create_delta(tr, c->t, &c->base, delta, commit_ts, oldest);
3583 : }
3584 18640 : if (t->idxs) {
3585 24403 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3586 5763 : sql_idx *i = n->data;
3587 5763 : sql_delta *delta = ATOMIC_PTR_GET(&i->data);
3588 :
3589 5763 : if (delta)
3590 5751 : ok = commit_create_delta(tr, i->t, &i->base, delta, commit_ts, oldest);
3591 : }
3592 : }
3593 18640 : if (!tr->parent)
3594 18638 : t->base.new = 0;
3595 : }
3596 18640 : if (!tr->parent)
3597 18638 : t->base.new = 0;
3598 : return ok;
3599 : }
3600 :
3601 : static int
3602 18524 : log_destroy_delta(sql_trans *tr, sql_delta *b, sqlid id)
3603 : {
3604 18524 : gdk_return ok = GDK_SUCCEED;
3605 :
3606 18524 : sqlstore *store = tr->store;
3607 18524 : if (!GDKinmemory(0) && b && b->cs.bid)
3608 18521 : ok = log_bat_transient(store->logger, id);
3609 18524 : if (ok == GDK_SUCCEED && !GDKinmemory(0) && b && b->cs.ebid)
3610 25 : ok = log_bat_transient(store->logger, -id);
3611 18524 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3612 : }
3613 :
3614 : static int
3615 168788 : destroy_col(sqlstore *store, sql_column *c)
3616 : {
3617 168788 : (void)store;
3618 168788 : if (ATOMIC_PTR_GET(&c->data))
3619 168788 : destroy_delta(ATOMIC_PTR_GET(&c->data), true);
3620 168788 : ATOMIC_PTR_SET(&c->data, NULL);
3621 168788 : return LOG_OK;
3622 : }
3623 :
3624 : static int
3625 16665 : log_destroy_col_(sql_trans *tr, sql_column *c)
3626 : {
3627 16665 : int ok = LOG_OK;
3628 16665 : assert(!isTempTable(c->t));
3629 16665 : if (!tr->parent) /* don't write save point commits */
3630 16665 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&c->data), c->base.id);
3631 16665 : return ok;
3632 : }
3633 :
3634 : static int
3635 55 : log_destroy_col(sql_trans *tr, sql_change *change)
3636 : {
3637 55 : sql_column *c = (sql_column*)change->obj;
3638 55 : int res = log_destroy_col_(tr, c);
3639 55 : change->obj = NULL;
3640 55 : column_destroy(tr->store, c);
3641 55 : return res;
3642 : }
3643 :
3644 : static int
3645 10825 : destroy_idx(sqlstore *store, sql_idx *i)
3646 : {
3647 10825 : (void)store;
3648 10825 : if (ATOMIC_PTR_GET(&i->data))
3649 10825 : destroy_delta(ATOMIC_PTR_GET(&i->data), true);
3650 10825 : ATOMIC_PTR_SET(&i->data, NULL);
3651 10825 : return LOG_OK;
3652 : }
3653 :
3654 : static int
3655 1921 : log_destroy_idx_(sql_trans *tr, sql_idx *i)
3656 : {
3657 1921 : int ok = LOG_OK;
3658 1921 : assert(!isTempTable(i->t));
3659 1921 : if (ATOMIC_PTR_GET(&i->data)) {
3660 1859 : if (!tr->parent) /* don't write save point commits */
3661 1859 : ok = log_destroy_delta(tr, ATOMIC_PTR_GET(&i->data), i->base.id);
3662 : }
3663 1921 : return ok;
3664 : }
3665 :
3666 : static int
3667 501 : log_destroy_idx(sql_trans *tr, sql_change *change)
3668 : {
3669 501 : sql_idx *i = (sql_idx*)change->obj;
3670 501 : int res = log_destroy_idx_(tr, i);
3671 501 : change->obj = NULL;
3672 501 : idx_destroy(tr->store, i);
3673 501 : return res;
3674 : }
3675 :
3676 : static int
3677 26738 : destroy_del(sqlstore *store, sql_table *t)
3678 : {
3679 26738 : (void)store;
3680 26738 : if (ATOMIC_PTR_GET(&t->data))
3681 26728 : destroy_storage(ATOMIC_PTR_GET(&t->data));
3682 26738 : ATOMIC_PTR_SET(&t->data, NULL);
3683 26738 : return LOG_OK;
3684 : }
3685 :
3686 : static int
3687 3158 : log_destroy_storage(sql_trans *tr, storage *bat, sqlid id)
3688 : {
3689 3158 : gdk_return ok = GDK_SUCCEED;
3690 :
3691 3158 : sqlstore *store = tr->store;
3692 3158 : if (!GDKinmemory(0) && !tr->parent && /* don't write save point commits */
3693 3158 : bat && bat->cs.bid)
3694 3158 : ok = log_bat_transient(store->logger, id);
3695 3158 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3696 : }
3697 :
3698 : static int
3699 3158 : log_destroy_del(sql_trans *tr, sql_change *change)
3700 : {
3701 3158 : int ok = LOG_OK;
3702 3158 : sql_table *t = (sql_table*)change->obj;
3703 :
3704 3158 : assert(!isTempTable(t));
3705 3158 : ok = log_destroy_storage(tr, ATOMIC_PTR_GET(&t->data), t->base.id);
3706 3158 : if (ok == LOG_OK) {
3707 19768 : for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
3708 16610 : sql_column *c = n->data;
3709 :
3710 16610 : ok = log_destroy_col_(tr, c);
3711 : }
3712 3158 : if (t->idxs) {
3713 4578 : for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
3714 1420 : sql_idx *i = n->data;
3715 :
3716 1420 : ok = log_destroy_idx_(tr, i);
3717 : }
3718 : }
3719 : }
3720 3158 : return ok;
3721 : }
3722 :
3723 : static int
3724 3796 : commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
3725 : {
3726 3796 : (void)tr;
3727 3796 : (void)change;
3728 3796 : (void)commit_ts;
3729 3796 : (void)oldest;
3730 3796 : if (commit_ts)
3731 3781 : change->handled = true;
3732 3796 : return 0;
3733 : }
3734 :
3735 : static int
3736 3231 : drop_del(sql_trans *tr, sql_table *t)
3737 : {
3738 3231 : int ok = LOG_OK;
3739 :
3740 3231 : if (!isNew(t)) {
3741 3231 : storage *bat = ATOMIC_PTR_GET(&t->data);
3742 3298 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_destroy_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_destroy_del);
3743 : }
3744 3231 : return ok;
3745 : }
3746 :
3747 : static int
3748 63 : drop_col(sql_trans *tr, sql_column *c)
3749 : {
3750 63 : assert(!isNew(c));
3751 63 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
3752 63 : trans_add(tr, &c->base, d, &tc_gc_col, &commit_destroy_del, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_destroy_col);
3753 63 : return LOG_OK;
3754 : }
3755 :
3756 : static int
3757 502 : drop_idx(sql_trans *tr, sql_idx *i)
3758 : {
3759 502 : assert(!isNew(i));
3760 502 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
3761 502 : trans_add(tr, &i->base, d, &tc_gc_idx, &commit_destroy_del, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_destroy_idx);
3762 502 : return LOG_OK;
3763 : }
3764 :
3765 :
3766 : static BUN
3767 129620 : clear_cs(sql_trans *tr, column_storage *cs, bool renew, bool temp)
3768 : {
3769 129620 : BAT *b;
3770 129620 : BUN sz = 0;
3771 :
3772 129620 : (void)tr;
3773 129620 : assert(cs->st == ST_DEFAULT || cs->st == ST_DICT || cs->st == ST_FOR);
3774 129620 : if (cs->bid && renew) {
3775 129621 : b = quick_descriptor(cs->bid);
3776 129612 : if (b) {
3777 129612 : sz += BATcount(b);
3778 129612 : if (cs->st == ST_DICT) {
3779 2 : bat nebid = temp_copy(cs->ebid, true, temp); /* create empty copy */
3780 2 : BAT *n = COLnew(0, TYPE_bte, 0, PERSISTENT);
3781 :
3782 2 : if (nebid == BID_NIL || !n) {
3783 0 : temp_destroy(nebid);
3784 0 : bat_destroy(n);
3785 0 : return BUN_NONE;
3786 : }
3787 2 : temp_destroy(cs->ebid);
3788 2 : cs->ebid = nebid;
3789 2 : if (!temp)
3790 2 : bat_set_access(n, BAT_READ);
3791 2 : temp_destroy(cs->bid);
3792 2 : cs->bid = temp_create(n); /* create empty copy */
3793 2 : bat_destroy(n);
3794 : } else {
3795 129610 : bat nbid = temp_copy(cs->bid, true, false); /* create empty copy */
3796 :
3797 129604 : if (nbid == BID_NIL)
3798 : return BUN_NONE;
3799 129604 : temp_destroy(cs->bid);
3800 129591 : cs->bid = nbid;
3801 : }
3802 : } else {
3803 : return BUN_NONE;
3804 : }
3805 : }
3806 129592 : if (cs->uibid) {
3807 129449 : temp_destroy(cs->uibid);
3808 129482 : cs->uibid = 0;
3809 : }
3810 129625 : if (cs->uvbid) {
3811 129482 : temp_destroy(cs->uvbid);
3812 129482 : cs->uvbid = 0;
3813 : }
3814 129625 : cs->cleared = true;
3815 129625 : cs->ucnt = 0;
3816 129625 : return sz;
3817 : }
3818 :
3819 : static BUN
3820 129474 : clear_col(sql_trans *tr, sql_column *c, bool renew)
3821 : {
3822 129474 : bool update_conflict = false;
3823 129474 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
3824 :
3825 129474 : if ((delta = bind_col_data(tr, c, renew?&update_conflict:NULL)) == NULL)
3826 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3827 129475 : assert(c->t->persistence != SQL_DECLARED_TABLE);
3828 129475 : if (odelta != delta)
3829 129480 : trans_add_table(tr, &c->base, c->t, delta, &tc_gc_upd_col, &commit_update_col, NOT_TO_BE_LOGGED(c->t) ? NULL : &log_update_col);
3830 129467 : if (delta)
3831 129467 : return clear_cs(tr, &delta->cs, renew, isTempTable(c->t));
3832 : return 0;
3833 : }
3834 :
3835 : static BUN
3836 21 : clear_idx(sql_trans *tr, sql_idx *i, bool renew)
3837 : {
3838 21 : bool update_conflict = false;
3839 21 : sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
3840 :
3841 21 : if (!isTable(i->t) || (hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
3842 15 : return 0;
3843 6 : if ((delta = bind_idx_data(tr, i, renew?&update_conflict:NULL)) == NULL)
3844 0 : return update_conflict ? BUN_NONE - 1 : BUN_NONE;
3845 6 : assert(i->t->persistence != SQL_DECLARED_TABLE);
3846 6 : if (odelta != delta)
3847 6 : trans_add_table(tr, &i->base, i->t, delta, &tc_gc_upd_idx, &commit_update_idx, NOT_TO_BE_LOGGED(i->t) ? NULL : &log_update_idx);
3848 6 : if (delta)
3849 6 : return clear_cs(tr, &delta->cs, renew, isTempTable(i->t));
3850 : return 0;
3851 : }
3852 :
3853 : static int
3854 141 : clear_storage(sql_trans *tr, sql_table *t, storage *s)
3855 : {
3856 141 : if (clear_cs(tr, &s->cs, true, isTempTable(t)) == BUN_NONE)
3857 : return LOG_ERR;
3858 141 : if (s->segs)
3859 141 : destroy_segments(s->segs);
3860 141 : if (!(s->segs = new_segments(tr, 0)))
3861 : return LOG_ERR;
3862 : return LOG_OK;
3863 : }
3864 :
3865 :
3866 : /*
3867 : * Clear the table, in general this means replacing the storage,
3868 : * but in case of earlier deletes (or inserts by this transaction), we only mark
3869 : * all segments as deleted.
3870 : * this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT
3871 : */
3872 : static BUN
3873 41824 : clear_del(sql_trans *tr, sql_table *t, int in_transaction)
3874 : {
3875 41824 : int clear = !in_transaction, ok = LOG_OK;
3876 41824 : bool conflict = false;
3877 41824 : storage *bat;
3878 :
3879 41875 : if ((bat = bind_del_data(tr, t, clear?&conflict:NULL)) == NULL)
3880 15796 : return conflict?BUN_NONE-1:BUN_NONE;
3881 :
3882 26028 : if (!clear) {
3883 51 : lock_table(tr->store, t->base.id);
3884 51 : ok = delete_range(tr, t, bat, 0, bat->segs->t->end);
3885 51 : unlock_table(tr->store, t->base.id);
3886 : }
3887 26028 : assert(t->persistence != SQL_DECLARED_TABLE);
3888 26028 : if (!in_transaction)
3889 25980 : trans_add_obj(tr, &t->base, bat, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
3890 26028 : if (ok == LOG_ERR)
3891 : return BUN_NONE;
3892 26028 : if (ok == LOG_CONFLICT)
3893 0 : return BUN_NONE - 1;
3894 : return LOG_OK;
3895 : }
3896 :
3897 : /* this function returns BUN_NONE on LOG_ERR and BUN_NONE - 1 on LOG_CONFLICT */
3898 : static BUN
3899 41824 : clear_table(sql_trans *tr, sql_table *t)
3900 : {
3901 41824 : node *n = ol_first_node(t->columns);
3902 41824 : sql_column *c = n->data;
3903 41824 : storage *d = tab_timestamp_storage(tr, t);
3904 41824 : int in_transaction, clear;
3905 41824 : BUN sz, clear_ok;
3906 :
3907 41824 : if (!d)
3908 : return BUN_NONE;
3909 41824 : in_transaction = segments_in_transaction(tr, t);
3910 41824 : clear = !in_transaction;
3911 41824 : sz = count_col(tr, c, CNT_ACTIVE);
3912 41824 : if ((clear_ok = clear_del(tr, t, in_transaction)) >= BUN_NONE - 1)
3913 : return clear_ok;
3914 :
3915 26028 : if (in_transaction)
3916 : return sz;
3917 :
3918 155452 : for (; n; n = n->next) {
3919 129475 : c = n->data;
3920 :
3921 129475 : if ((clear_ok = clear_col(tr, c, clear)) >= BUN_NONE - 1)
3922 0 : return clear_ok;
3923 : }
3924 25977 : if (t->idxs) {
3925 25998 : for (n = ol_first_node(t->idxs); n; n = n->next) {
3926 21 : sql_idx *ci = n->data;
3927 :
3928 21 : if (isTable(ci->t) && idx_has_column(ci->type) &&
3929 21 : (clear_ok = clear_idx(tr, ci, clear)) >= BUN_NONE - 1)
3930 0 : return clear_ok;
3931 : }
3932 : }
3933 : return sz;
3934 : }
3935 :
3936 : static int
3937 158890 : tr_log_cs( sql_trans *tr, sql_table *t, column_storage *cs, segment *segs, sqlid id)
3938 : {
3939 158890 : sqlstore *store = tr->store;
3940 158890 : gdk_return ok = GDK_SUCCEED;
3941 :
3942 158890 : (void) t;
3943 158890 : (void) segs;
3944 158890 : if (GDKinmemory(0))
3945 : return LOG_OK;
3946 :
3947 158883 : if (cs->cleared) {
3948 155506 : assert(cs->ucnt == 0);
3949 155506 : BAT *ins = temp_descriptor(cs->bid);
3950 155506 : if (!ins)
3951 : return LOG_ERR;
3952 155506 : assert(!isEbat(ins));
3953 155506 : bat_set_access(ins, BAT_READ);
3954 155506 : ok = log_bat_persists(store->logger, ins, id);
3955 155506 : bat_destroy(ins);
3956 155506 : if (ok == GDK_SUCCEED && cs->ebid) {
3957 56 : BAT *ins = temp_descriptor(cs->ebid);
3958 56 : if (!ins)
3959 : return LOG_ERR;
3960 56 : assert(!isEbat(ins));
3961 56 : bat_set_access(ins, BAT_READ);
3962 56 : ok = log_bat_persists(store->logger, ins, -id);
3963 56 : bat_destroy(ins);
3964 : }
3965 155506 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3966 : }
3967 :
3968 3377 : assert(!isTempTable(t));
3969 :
3970 3377 : if (ok == GDK_SUCCEED && cs->ucnt && cs->uibid) {
3971 2795 : BAT *ui = temp_descriptor(cs->uibid);
3972 2795 : BAT *uv = temp_descriptor(cs->uvbid);
3973 : /* any updates */
3974 2795 : if (ui == NULL || uv == NULL) {
3975 : ok = GDK_FAIL;
3976 2795 : } else if (BATcount(uv) > uv->batInserted || BATdirty(uv))
3977 2795 : ok = log_delta(store->logger, ui, uv, id);
3978 2795 : bat_destroy(ui);
3979 2795 : bat_destroy(uv);
3980 : }
3981 2795 : return ok == GDK_SUCCEED ? LOG_OK : LOG_ERR;
3982 : }
3983 :
3984 : static inline int
3985 57899 : tr_log_table_start(sql_trans *tr, sql_table *t) {
3986 57899 : sqlstore *store = tr->store;
3987 57899 : return log_bat_group_start(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3988 : }
3989 :
3990 : static inline int
3991 57899 : tr_log_table_end(sql_trans *tr, sql_table *t) {
3992 57899 : sqlstore *store = tr->store;
3993 57899 : return log_bat_group_end(store->logger, t->base.id) == GDK_SUCCEED? LOG_OK: LOG_ERR;
3994 : }
3995 :
3996 : static int
3997 57899 : log_table_append(sql_trans *tr, sql_table *t, segments *segs)
3998 : {
3999 57899 : sqlstore *store = tr->store;
4000 57899 : gdk_return ok = GDK_SUCCEED;
4001 :
4002 57899 : size_t end = segs_end(segs, tr, t);
4003 :
4004 57899 : if (tr_log_table_start(tr, t) != LOG_OK)
4005 : return LOG_ERR;
4006 :
4007 57899 : size_t nr_appends = 0;
4008 :
4009 57899 : lock_table(tr->store, t->base.id);
4010 416761 : for (segment *seg = segs->h; seg; seg=ATOMIC_PTR_GET(&seg->next)) {
4011 358862 : unlock_table(tr->store, t->base.id);
4012 :
4013 358862 : if (seg->ts == tr->tid && seg->end-seg->start) {
4014 116526 : if (!seg->deleted) {
4015 46722 : if (log_segment(tr, seg, t->base.id) != LOG_OK)
4016 : return LOG_ERR;
4017 :
4018 46722 : nr_appends += (seg->end - seg->start);
4019 : }
4020 : }
4021 358862 : lock_table(tr->store, t->base.id);
4022 : }
4023 57899 : unlock_table(tr->store, t->base.id);
4024 :
4025 398675 : for (node *n = ol_first_node(t->columns); n && ok == GDK_SUCCEED; n = n->next) {
4026 340776 : sql_column *c = n->data;
4027 340776 : column_storage *cs = ATOMIC_PTR_GET(&c->data);
4028 :
4029 340776 : if (cs->cleared) {
4030 3 : ok = (tr_log_cs(tr, t, cs, NULL, c->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4031 3 : continue;
4032 : }
4033 :
4034 340773 : lock_table(tr->store, t->base.id);
4035 340773 : if (!cs->cleared) {
4036 2415590 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4037 2074817 : unlock_table(tr->store, t->base.id);
4038 2074817 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4039 : /* append col*/
4040 261769 : BAT *ins = temp_descriptor(cs->bid);
4041 261769 : if (ins == NULL)
4042 : return LOG_ERR;
4043 261769 : assert(BATcount(ins) >= cur->end);
4044 261769 : ok = log_bat(store->logger, ins, c->base.id, cur->start, cur->end-cur->start, nr_appends);
4045 261769 : bat_destroy(ins);
4046 : }
4047 2074817 : lock_table(tr->store, t->base.id);
4048 : }
4049 : }
4050 340773 : unlock_table(tr->store, t->base.id);
4051 :
4052 340773 : if (ok == GDK_SUCCEED && cs->ebid) {
4053 19 : BAT *ins = temp_descriptor(cs->ebid);
4054 19 : if (ins == NULL)
4055 : return LOG_ERR;
4056 19 : if (BATcount(ins) > ins->batInserted)
4057 17 : ok = log_bat(store->logger, ins, -c->base.id, ins->batInserted, BATcount(ins)-ins->batInserted, 0);
4058 19 : BATcommit(ins, BATcount(ins));
4059 19 : bat_destroy(ins);
4060 : }
4061 : }
4062 :
4063 57899 : if (t->idxs) {
4064 63097 : for (node *n = ol_first_node(t->idxs); n && ok == GDK_SUCCEED; n = n->next) {
4065 5198 : sql_idx *i = n->data;
4066 :
4067 5198 : if ((hash_index(i->type) && list_length(i->columns) <= 1) || !idx_has_column(i->type))
4068 4420 : continue;
4069 778 : column_storage *cs = ATOMIC_PTR_GET(&i->data);
4070 :
4071 778 : if (cs) {
4072 778 : if (cs->cleared) {
4073 0 : ok = (tr_log_cs(tr, t, cs, NULL, i->base.id) == LOG_OK)? GDK_SUCCEED : GDK_FAIL;
4074 0 : continue;
4075 : }
4076 :
4077 778 : lock_table(tr->store, t->base.id);
4078 2387 : for (segment *cur = segs->h; cur && ok == GDK_SUCCEED; cur = ATOMIC_PTR_GET(&cur->next)) {
4079 1609 : unlock_table(tr->store, t->base.id);
4080 1609 : if (cur->ts == tr->tid && !cur->deleted && cur->start < end) {
4081 : /* append idx */
4082 730 : BAT *ins = temp_descriptor(cs->bid);
4083 730 : if (ins == NULL)
4084 : return LOG_ERR;
4085 730 : assert(BATcount(ins) >= cur->end);
4086 730 : ok = log_bat(store->logger, ins, i->base.id, cur->start, cur->end-cur->start, nr_appends);
4087 730 : bat_destroy(ins);
4088 : }
4089 1609 : lock_table(tr->store, t->base.id);
4090 : }
4091 778 : unlock_table(tr->store, t->base.id);
4092 : }
4093 : }
4094 : }
4095 :
4096 57899 : if (ok != GDK_SUCCEED || tr_log_table_end(tr, t) != LOG_OK)
4097 0 : return LOG_ERR;
4098 :
4099 : return LOG_OK;
4100 : }
4101 :
4102 : static int
4103 83870 : log_storage(sql_trans *tr, sql_table *t, storage *s)
4104 : {
4105 83870 : int ok = LOG_OK;
4106 83870 : bool cleared = s->cs.cleared;
4107 83870 : if (ok == LOG_OK && cleared)
4108 25971 : ok = tr_log_cs(tr, t, &s->cs, s->segs->h, t->base.id);
4109 25971 : if (ok == LOG_OK)
4110 83870 : ok = log_segments(tr, s->segs, t->base.id);
4111 83870 : if (ok == LOG_OK && !cleared)
4112 57899 : ok = log_table_append(tr, t, s->segs);
4113 83870 : return ok;
4114 : }
4115 :
4116 : static void
4117 302063 : merge_cs( column_storage *cs, const char* caller)
4118 : {
4119 302063 : if (cs->bid && cs->ucnt) {
4120 2803 : BAT *cur = temp_descriptor(cs->bid);
4121 2803 : BAT *ui = temp_descriptor(cs->uibid);
4122 2803 : BAT *uv = temp_descriptor(cs->uvbid);
4123 :
4124 2803 : if (!cur || !ui || !uv) {
4125 0 : bat_destroy(ui);
4126 0 : bat_destroy(uv);
4127 0 : bat_destroy(cur);
4128 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4129 : return;
4130 : }
4131 2803 : assert(BATcount(ui) == BATcount(uv));
4132 :
4133 : /* any updates */
4134 2803 : assert(!isEbat(cur));
4135 2803 : if (BATreplace(cur, ui, uv, true) != GDK_SUCCEED) {
4136 0 : bat_destroy(ui);
4137 0 : bat_destroy(uv);
4138 0 : bat_destroy(cur);
4139 0 : GDKfatal(FATAL_MERGE_FAILURE, caller);
4140 : return;
4141 : }
4142 : /* cleanup the old deltas */
4143 2803 : temp_destroy(cs->uibid);
4144 2803 : temp_destroy(cs->uvbid);
4145 2803 : cs->uibid = e_bat(TYPE_oid);
4146 2803 : cs->uvbid = e_bat(cur->ttype);
4147 2803 : assert(cs->uibid != BID_NIL && cs->uvbid != BID_NIL); // Should be pre-allocated.
4148 2803 : cs->ucnt = 0;
4149 2803 : bat_destroy(ui);
4150 2803 : bat_destroy(uv);
4151 2803 : bat_destroy(cur);
4152 : }
4153 302063 : cs->cleared = false;
4154 302063 : cs->merged = true;
4155 302063 : return;
4156 : }
4157 :
4158 : static void
4159 260603 : merge_delta( sql_delta *obat)
4160 : {
4161 260603 : if (obat && obat->next && !obat->cs.merged)
4162 51927 : merge_delta(obat->next);
4163 260603 : merge_cs(&obat->cs, __func__);
4164 260603 : }
4165 :
4166 : static void
4167 41460 : merge_storage(storage *tdb)
4168 : {
4169 41460 : merge_cs(&tdb->cs, __func__);
4170 :
4171 41460 : if (tdb->next) {
4172 420 : destroy_storage(tdb->next);
4173 420 : tdb->next = NULL;
4174 : }
4175 41460 : }
4176 :
4177 : static sql_delta *
4178 1 : savepoint_commit_delta( sql_delta *delta, ulng commit_ts)
4179 : {
4180 : /* commit ie copy back to the parent transaction */
4181 1 : if (delta && delta->cs.ts == commit_ts && delta->next) {
4182 1 : sql_delta *od = delta->next;
4183 1 : if (od->cs.ts == commit_ts) {
4184 0 : sql_delta t = *od, *n = od->next;
4185 0 : *od = *delta;
4186 0 : od->next = n;
4187 0 : *delta = t;
4188 0 : delta->next = NULL;
4189 0 : destroy_delta(delta, true);
4190 0 : return od;
4191 : }
4192 : }
4193 : return delta;
4194 : }
4195 :
4196 : static int
4197 132890 : log_update_col( sql_trans *tr, sql_change *change)
4198 : {
4199 132890 : sql_column *c = (sql_column*)change->obj;
4200 132890 : assert(!isTempTable(c->t));
4201 :
4202 132890 : if (isDeleted(c->t)) {
4203 0 : change->handled = true;
4204 0 : return LOG_OK;
4205 : }
4206 :
4207 132890 : if (!isDeleted(c->t) && !tr->parent) {/* don't write save point commits */
4208 132890 : storage *s = ATOMIC_PTR_GET(&c->t->data);
4209 132890 : sql_delta *d = ATOMIC_PTR_GET(&c->data);
4210 132890 : return tr_log_cs(tr, c->t, &d->cs, s->segs->h, c->base.id);
4211 : }
4212 : return LOG_OK;
4213 : }
4214 :
4215 : static int
4216 217 : tc_gc_rollbacked( sql_store Store, sql_change *change, ulng oldest)
4217 : {
4218 217 : sqlstore *store = Store;
4219 :
4220 217 : sql_delta *d = (sql_delta*)change->data;
4221 217 : if (d->cs.ts < oldest) {
4222 82 : destroy_delta(d, false);
4223 82 : if (change->commit == &commit_update_idx)
4224 2 : table_destroy(store, ((sql_idx*)change->obj)->t);
4225 : else
4226 80 : table_destroy(store, ((sql_column*)change->obj)->t);
4227 82 : return 1;
4228 : }
4229 135 : if (d->cs.ts > TRANSACTION_ID_BASE)
4230 82 : d->cs.ts = store_get_timestamp(store) + 1;
4231 : return 0;
4232 : }
4233 :
4234 : static int
4235 9 : tc_gc_rollbacked_storage( sql_store Store, sql_change *change, ulng oldest)
4236 : {
4237 9 : sqlstore *store = Store;
4238 :
4239 9 : storage *d = (storage*)change->data;
4240 9 : if (d->cs.ts < oldest) {
4241 3 : destroy_storage(d);
4242 3 : table_destroy(store, (sql_table*)change->obj);
4243 3 : return 1;
4244 : }
4245 6 : if (d->cs.ts > TRANSACTION_ID_BASE)
4246 3 : d->cs.ts = store_get_timestamp(store) + 1;
4247 : return 0;
4248 : }
4249 :
4250 : static int
4251 133008 : commit_update_delta( sql_trans *tr, sql_change *change, sql_table* t, sql_base* base, ATOMIC_PTR_TYPE* data, int type, ulng commit_ts, ulng oldest)
4252 : {
4253 133008 : (void) type; // TODO transaction_layer_revamp remove if remains unused
4254 :
4255 133008 : sql_delta *delta = ATOMIC_PTR_GET(data);
4256 :
4257 133008 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4258 3 : int ok = LOG_OK;
4259 3 : assert(isTempTable(t));
4260 3 : if (clear_cs(tr, &delta->cs, true, isTempTable(t)) == BUN_NONE)
4261 0 : ok = LOG_ERR; /* CA_DELETE as CA_DROP's are gone already (or for globals are equal to a CA_DELETE) */
4262 3 : if (!tr->parent)
4263 0 : t->base.new = base->new = 0;
4264 3 : change->handled = true;
4265 3 : return ok;
4266 : }
4267 :
4268 133005 : if (commit_ts)
4269 132923 : delta->cs.ts = commit_ts;
4270 133005 : if (!commit_ts) { /* rollback */
4271 82 : sql_delta *d = change->data, *o = ATOMIC_PTR_GET(data);
4272 :
4273 82 : if (change->ts && t->base.new) /* handled by create col */
4274 : return LOG_OK;
4275 82 : if (o != d) {
4276 0 : while(o && o->next != d)
4277 : o = o->next;
4278 : }
4279 82 : if (o == ATOMIC_PTR_GET(data))
4280 82 : ATOMIC_PTR_SET(data, d->next);
4281 : else
4282 0 : o->next = d->next;
4283 82 : d->next = NULL;
4284 82 : change->cleanup = &tc_gc_rollbacked;
4285 132923 : } else if (!tr->parent) {
4286 : /* merge deltas */
4287 359761 : while (delta && delta->cs.ts > oldest)
4288 226839 : delta = delta->next;
4289 132922 : if (delta && !delta->cs.merged && delta->cs.ts <= oldest) {
4290 31527 : lock_column(tr->store, base->id); /* lock for concurrent updates (appends) */
4291 31527 : merge_delta(delta);
4292 31527 : unlock_column(tr->store, base->id);
4293 : }
4294 1 : } else if (tr->parent) /* move delta into older and cleanup current save points */
4295 1 : ATOMIC_PTR_SET(data, savepoint_commit_delta(delta, commit_ts));
4296 : return LOG_OK;
4297 : }
4298 :
4299 : static int
4300 132980 : commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4301 : {
4302 :
4303 132980 : sql_column *c = (sql_column*)change->obj;
4304 132980 : sql_base* base = &c->base;
4305 132980 : sql_table* t = c->t;
4306 132980 : ATOMIC_PTR_TYPE* data = &c->data;
4307 132980 : int type = c->type.type->localtype;
4308 :
4309 132980 : if (change->handled || isDeleted(c->t))
4310 : return LOG_OK;
4311 :
4312 132980 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4313 : }
4314 :
4315 : static int
4316 26 : log_update_idx( sql_trans *tr, sql_change *change)
4317 : {
4318 26 : sql_idx *i = (sql_idx*)change->obj;
4319 26 : assert(!isTempTable(i->t));
4320 :
4321 26 : if (isDeleted(i->t)) {
4322 0 : change->handled = true;
4323 0 : return LOG_OK;
4324 : }
4325 :
4326 26 : if (!isDeleted(i->t) && !tr->parent) { /* don't write save point commits */
4327 26 : storage *s = ATOMIC_PTR_GET(&i->t->data);
4328 26 : sql_delta *d = ATOMIC_PTR_GET(&i->data);
4329 26 : return tr_log_cs(tr, i->t, &d->cs, s->segs->h, i->base.id);
4330 : }
4331 : return LOG_OK;
4332 : }
4333 :
4334 : static int
4335 28 : commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4336 : {
4337 28 : sql_idx *i = (sql_idx*)change->obj;
4338 28 : sql_base* base = &i->base;
4339 28 : sql_table* t = i->t;
4340 28 : ATOMIC_PTR_TYPE* data = &i->data;
4341 28 : int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
4342 :
4343 28 : if (change->handled || isDeleted(i->t))
4344 : return LOG_OK;
4345 :
4346 28 : return commit_update_delta(tr, change, t, base, data, type, commit_ts, oldest);
4347 : }
4348 :
4349 : static storage *
4350 26 : savepoint_commit_storage( storage *dbat, ulng commit_ts)
4351 : {
4352 26 : if (dbat && dbat->cs.ts == commit_ts && dbat->next) {
4353 0 : storage *od = dbat->next;
4354 0 : if (od->cs.ts == commit_ts) {
4355 0 : storage t = *od, *n = od->next;
4356 0 : *od = *dbat;
4357 0 : od->next = n;
4358 0 : *dbat = t;
4359 0 : dbat->next = NULL;
4360 0 : destroy_storage(dbat);
4361 0 : return od;
4362 : }
4363 : }
4364 : return dbat;
4365 : }
4366 :
4367 : static int
4368 83870 : log_update_del( sql_trans *tr, sql_change *change)
4369 : {
4370 83870 : sql_table *t = (sql_table*)change->obj;
4371 83870 : assert(!isTempTable(t));
4372 :
4373 83870 : if (isDeleted(t)) {
4374 0 : change->handled = true;
4375 0 : return LOG_OK;
4376 : }
4377 :
4378 83870 : if (!isDeleted(t) && !tr->parent) /* don't write save point commits */
4379 83870 : return log_storage(tr, t, ATOMIC_PTR_GET(&t->data));
4380 : return LOG_OK;
4381 : }
4382 :
4383 : static int
4384 88386 : commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
4385 : {
4386 88386 : int ok = LOG_OK;
4387 88386 : sql_table *t = (sql_table*)change->obj;
4388 88386 : storage *dbat = ATOMIC_PTR_GET(&t->data);
4389 :
4390 88386 : if (change->handled || isDeleted(t))
4391 : return ok;
4392 :
4393 88386 : if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
4394 22 : assert(isTempTable(t));
4395 22 : if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
4396 22 : if (commit_ts) dbat->segs->h->ts = commit_ts;
4397 22 : change->handled = true;
4398 22 : return ok;
4399 : }
4400 :
4401 88364 : lock_table(tr->store, t->base.id);
4402 88364 : if (!commit_ts) { /* rollback */
4403 4194 : if (dbat->cs.ts == tr->tid) {
4404 6 : if (change->ts && t->base.new) { /* handled by the create table */
4405 3 : unlock_table(tr->store, t->base.id);
4406 3 : return ok;
4407 : }
4408 3 : storage *d = change->data, *o = ATOMIC_PTR_GET(&t->data);
4409 :
4410 3 : if (o != d) {
4411 0 : while(o && o->next != d)
4412 : o = o->next;
4413 : }
4414 3 : if (o == ATOMIC_PTR_GET(&t->data)) {
4415 3 : assert(d->next);
4416 3 : ATOMIC_PTR_SET(&t->data, d->next);
4417 : } else
4418 0 : o->next = d->next;
4419 3 : d->next = NULL;
4420 3 : change->cleanup = &tc_gc_rollbacked_storage;
4421 : } else
4422 4188 : rollback_segments(dbat->segs, tr, change, oldest);
4423 84170 : } else if (ok == LOG_OK && !tr->parent) {
4424 84144 : if (dbat->cs.ts == tr->tid) /* cleared table */
4425 25973 : dbat->cs.ts = commit_ts;
4426 :
4427 84144 : ok = segments2cs(tr, dbat->segs, &dbat->cs);
4428 84144 : if (ok == LOG_OK) {
4429 84144 : merge_segments(dbat, tr, change, commit_ts, oldest);
4430 84144 : if (oldest == commit_ts)
4431 41460 : merge_storage(dbat);
4432 : }
4433 84144 : if (dbat)
4434 84144 : dbat->cs.cleared = false;
4435 26 : } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
4436 26 : merge_segments(dbat, tr, change, commit_ts, oldest);
4437 26 : ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
4438 26 : storage *s = change->data;
4439 26 : if (s->cs.ts == tr->tid)
4440 0 : s->cs.ts = commit_ts;
4441 : }
4442 88361 : unlock_table(tr->store, t->base.id);
4443 88361 : return ok;
4444 : }
4445 :
4446 : /* only rollback (content version) case for now */
4447 : static int
4448 215 : tc_gc_col( sql_store Store, sql_change *change, ulng oldest)
4449 : {
4450 215 : sqlstore *store = Store;
4451 215 : sql_column *c = (sql_column*)change->obj;
4452 :
4453 215 : if (!c) /* cleaned earlier */
4454 : return 1;
4455 :
4456 160 : if (change->handled || isDeleted(c->t)) {
4457 0 : column_destroy(store, c);
4458 0 : return 1;
4459 : }
4460 :
4461 : /* savepoint commit (did it merge ?) */
4462 160 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4463 0 : column_destroy(store, c);
4464 0 : return 1;
4465 : }
4466 160 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4467 : return 0;
4468 159 : sql_delta *d = (sql_delta*)change->data;
4469 159 : if (d && d->next) {
4470 :
4471 62 : if (d->cs.ts > oldest)
4472 : return LOG_OK; /* cannot cleanup yet */
4473 :
4474 : // d is oldest reachable delta
4475 59 : if (d->next) // Unreachable can immediately be destroyed.
4476 59 : destroy_delta(d->next, true);
4477 :
4478 59 : d->next = NULL;
4479 59 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4480 59 : merge_delta(d);
4481 59 : unlock_column(store, c->base.id);
4482 : }
4483 156 : column_destroy(store, c);
4484 156 : return 1;
4485 : }
4486 :
4487 : static int
4488 874388 : tc_gc_upd_col( sql_store Store, sql_change *change, ulng oldest)
4489 : {
4490 874388 : sqlstore *store = Store;
4491 874388 : sql_column *c = (sql_column*)change->obj;
4492 :
4493 874388 : if (!c) /* cleaned earlier */
4494 : return 1;
4495 :
4496 874388 : if (change->handled || isDeleted(c->t)) {
4497 3 : table_destroy(store, c->t);
4498 3 : return 1;
4499 : }
4500 :
4501 : /* savepoint commit (did it merge ?) */
4502 874385 : if (ATOMIC_PTR_GET(&c->data) != change->data) { /* data is freed by commit */
4503 71883 : table_destroy(store, c->t);
4504 71883 : return 1;
4505 : }
4506 802502 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4507 : return 0;
4508 802501 : sql_delta *d = (sql_delta*)change->data;
4509 802501 : if (d && d->next) {
4510 :
4511 802501 : if (d->cs.ts > oldest)
4512 : return LOG_OK; /* cannot cleanup yet */
4513 :
4514 : // d is oldest reachable delta
4515 60954 : if (d->next) // Unreachable can immediately be destroyed.
4516 60954 : destroy_delta(d->next, true);
4517 :
4518 60954 : d->next = NULL;
4519 60954 : lock_column(store, c->base.id); /* lock for concurrent updates (appends) */
4520 60954 : merge_delta(d);
4521 60954 : unlock_column(store, c->base.id);
4522 : }
4523 60954 : table_destroy(store, c->t);
4524 60954 : return 1;
4525 : }
4526 :
4527 : static int
4528 1133 : tc_gc_idx( sql_store Store, sql_change *change, ulng oldest)
4529 : {
4530 1133 : sqlstore *store = Store;
4531 1133 : sql_idx *i = (sql_idx*)change->obj;
4532 :
4533 1133 : if (!i) /* cleaned earlier */
4534 : return 1;
4535 :
4536 632 : if (change->handled || isDeleted(i->t)) {
4537 0 : idx_destroy(store, i);
4538 0 : return 1;
4539 : }
4540 :
4541 : /* savepoint commit (did it merge ?) */
4542 632 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4543 0 : idx_destroy(store, i);
4544 0 : return 1;
4545 : }
4546 632 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4547 : return 0;
4548 632 : sql_delta *d = (sql_delta*)change->data;
4549 632 : if (d->next) {
4550 :
4551 0 : if (d->cs.ts > oldest)
4552 : return LOG_OK; /* cannot cleanup yet */
4553 :
4554 : // d is oldest reachable delta
4555 0 : if (d->next) // Unreachable can immediately be destroyed.
4556 0 : destroy_delta(d->next, true);
4557 :
4558 0 : d->next = NULL;
4559 0 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4560 0 : merge_delta(d);
4561 0 : unlock_column(store, i->base.id);
4562 : }
4563 632 : idx_destroy(store, i);
4564 632 : return 1;
4565 : }
4566 :
4567 : static int
4568 26 : tc_gc_upd_idx( sql_store Store, sql_change *change, ulng oldest)
4569 : {
4570 26 : sqlstore *store = Store;
4571 26 : sql_idx *i = (sql_idx*)change->obj;
4572 :
4573 26 : if (!i) /* cleaned earlier */
4574 : return 1;
4575 :
4576 26 : if (change->handled || isDeleted(i->t)) {
4577 0 : table_destroy(store, i->t);
4578 0 : return 1;
4579 : }
4580 :
4581 : /* savepoint commit (did it merge ?) */
4582 26 : if (ATOMIC_PTR_GET(&i->data) != change->data) { /* data is freed by commit */
4583 0 : table_destroy(store, i->t);
4584 0 : return 1;
4585 : }
4586 26 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4587 : return 0;
4588 26 : sql_delta *d = (sql_delta*)change->data;
4589 26 : if (d->next) {
4590 :
4591 26 : if (d->cs.ts > oldest)
4592 : return LOG_OK; /* cannot cleanup yet */
4593 :
4594 : // d is oldest reachable delta
4595 26 : if (d->next) // Unreachable can immediately be destroyed.
4596 26 : destroy_delta(d->next, true);
4597 :
4598 26 : d->next = NULL;
4599 26 : lock_column(store, i->base.id); /* lock for concurrent updates (appends) */
4600 26 : merge_delta(d);
4601 26 : unlock_column(store, i->base.id);
4602 : }
4603 26 : table_destroy(store, i->t);
4604 26 : return 1;
4605 : }
4606 :
4607 : static int
4608 247816 : tc_gc_del( sql_store Store, sql_change *change, ulng oldest)
4609 : {
4610 247816 : sqlstore *store = Store;
4611 247816 : sql_table *t = (sql_table*)change->obj;
4612 :
4613 247816 : if (change->handled || isDeleted(t)) {
4614 3404 : table_destroy(store, t);
4615 3404 : return 1;
4616 : }
4617 : /* savepoint commit (did it merge ?) */
4618 244412 : if (ATOMIC_PTR_GET(&t->data) != change->data) { /* data is freed by commit */
4619 14396 : table_destroy(store, t);
4620 14396 : return 1;
4621 : }
4622 230016 : if (oldest && oldest >= TRANSACTION_ID_BASE) /* cannot cleanup older stuff on savepoint commits */
4623 : return 0;
4624 229988 : storage *d = (storage*)change->data;
4625 229988 : if (d->next) {
4626 158812 : if (d->cs.ts > oldest)
4627 : return LOG_OK; /* cannot cleanup yet */
4628 :
4629 11158 : destroy_storage(d->next);
4630 11158 : d->next = NULL;
4631 : }
4632 82334 : table_destroy(store, t);
4633 82334 : return 1;
4634 : }
4635 :
4636 : static int
4637 30397 : add_offsets(BUN slot, size_t nr, size_t total, BUN *offset, BAT **offsets)
4638 : {
4639 30397 : if (nr == 0)
4640 : return LOG_OK;
4641 30397 : assert (nr > 0);
4642 30397 : if ((!offsets || !*offsets) && nr == total) {
4643 30368 : *offset = slot;
4644 30368 : return LOG_OK;
4645 : }
4646 29 : if (!*offsets) {
4647 7 : *offsets = COLnew(0, TYPE_oid, total, SYSTRANS);
4648 7 : if (!*offsets)
4649 : return LOG_ERR;
4650 : }
4651 29 : oid *restrict dst = Tloc(*offsets, BATcount(*offsets));
4652 16168 : for(size_t i = 0; i < nr; i++)
4653 16139 : dst[i] = slot + i;
4654 29 : (*offsets)->batCount += nr;
4655 29 : (*offsets)->theap->dirty = true;
4656 29 : return LOG_OK;
4657 : }
4658 :
4659 : static int
4660 30375 : claim_segmentsV2(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4661 : {
4662 30375 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4663 30375 : assert(s->segs);
4664 30375 : ulng oldest = store_oldest(tr->store, NULL);
4665 30375 : BUN slot = 0;
4666 30375 : size_t total = cnt;
4667 :
4668 30375 : if (!locked)
4669 30375 : lock_table(tr->store, t->base.id);
4670 : /* naive vacuum approach, iterator through segments, use deleted segments or create new segment at the end */
4671 60882 : for (segment *seg = s->segs->h, *p = NULL; seg && cnt && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4672 30509 : if (seg->deleted && seg->ts < oldest && seg->end > seg->start) { /* re-use old deleted or rolledback append */
4673 35 : if ((seg->end - seg->start) >= cnt) {
4674 : /* if previous is claimed before we could simply adjust the end/start */
4675 13 : if (p && p->ts == tr->tid && !p->deleted) {
4676 2 : slot = p->end;
4677 2 : p->end += cnt;
4678 2 : seg->start += cnt;
4679 2 : if (add_offsets(slot, cnt, total, offset, offsets) != LOG_OK) {
4680 : ok = LOG_ERR;
4681 : break;
4682 : }
4683 2 : cnt = 0;
4684 2 : break;
4685 : }
4686 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4687 11 : size_t rcnt = seg->end - seg->start;
4688 11 : if (rcnt > cnt)
4689 : rcnt = cnt;
4690 11 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, rcnt, false)) == NULL) {
4691 : ok = LOG_ERR;
4692 : break;
4693 : }
4694 : }
4695 33 : seg->ts = tr->tid;
4696 33 : seg->deleted = false;
4697 33 : slot = seg->start;
4698 33 : if (add_offsets(slot, (seg->end-seg->start), total, offset, offsets) != LOG_OK) {
4699 : ok = LOG_ERR;
4700 : break;
4701 : }
4702 33 : cnt -= (seg->end - seg->start);
4703 : }
4704 : }
4705 30375 : if (ok == LOG_OK && cnt) {
4706 30362 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4707 29247 : slot = s->segs->t->end;
4708 29247 : s->segs->t->end += cnt;
4709 : } else {
4710 1115 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4711 : ok = LOG_ERR;
4712 : } else {
4713 1115 : if (!s->segs->h)
4714 0 : s->segs->h = s->segs->t;
4715 1115 : slot = s->segs->t->start;
4716 : }
4717 : }
4718 30362 : if (ok == LOG_OK)
4719 30362 : ok = add_offsets(slot, cnt, total, offset, offsets);
4720 : }
4721 30375 : if (!locked)
4722 30375 : unlock_table(tr->store, t->base.id);
4723 :
4724 30375 : if (ok == LOG_OK) {
4725 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4726 30375 : if (!in_transaction) {
4727 1100 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4728 1100 : in_transaction = true;
4729 : }
4730 30375 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4731 30361 : tr->logchanges += (int) total;
4732 30375 : if (*offsets) {
4733 7 : BAT *pos = *offsets;
4734 7 : assert(BATcount(pos) == total);
4735 7 : BATsetcount(pos, total); /* set other properties */
4736 7 : pos->tnil = false;
4737 7 : pos->tnonil = true;
4738 7 : pos->tkey = true;
4739 7 : pos->tsorted = true;
4740 7 : pos->trevsorted = false;
4741 : }
4742 : }
4743 30375 : return ok;
4744 : }
4745 :
4746 : static int
4747 2053094 : claim_segments(sql_trans *tr, sql_table *t, storage *s, size_t cnt, BUN *offset, BAT **offsets, bool locked)
4748 : {
4749 2053094 : if (cnt > 1 && offsets)
4750 30375 : return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
4751 2022719 : int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
4752 2022719 : assert(s->segs);
4753 2022719 : ulng oldest = store_oldest(tr->store, NULL);
4754 2022719 : BUN slot = 0;
4755 2022719 : int reused = 0;
4756 :
4757 2022719 : if (!locked)
4758 1901661 : lock_table(tr->store, t->base.id);
4759 : /* naive vacuum approach, iterator through segments, check for large enough deleted segments
4760 : * or create new segment at the end */
4761 8519879 : for (segment *seg = s->segs->h, *p = NULL; seg && ok == LOG_OK; p = seg, seg = ATOMIC_PTR_GET(&seg->next)) {
4762 6571155 : if (seg->deleted && seg->ts < oldest && (seg->end-seg->start) >= cnt) { /* re-use old deleted or rolledback append */
4763 :
4764 73995 : if ((seg->end - seg->start) >= cnt) {
4765 :
4766 : /* if previous is claimed before we could simply adjust the end/start */
4767 73995 : if (p && p->ts == tr->tid && !p->deleted) {
4768 58513 : slot = p->end;
4769 58513 : p->end += cnt;
4770 58513 : seg->start += cnt;
4771 58513 : reused = 1;
4772 58513 : break;
4773 : }
4774 : /* we claimed part of the old segment, the split off part needs to stay deleted */
4775 15482 : if ((seg=split_segment(s->segs, seg, p, tr, seg->start, cnt, false)) == NULL) {
4776 : ok = LOG_ERR;
4777 : break;
4778 : }
4779 : }
4780 15482 : seg->ts = tr->tid;
4781 15482 : seg->deleted = false;
4782 15482 : slot = seg->start;
4783 15482 : reused = 1;
4784 15482 : break;
4785 : }
4786 : }
4787 2022719 : if (ok == LOG_OK && !reused) {
4788 1948724 : if (s->segs->t && s->segs->t->ts == tr->tid && !s->segs->t->deleted) {
4789 1913775 : slot = s->segs->t->end;
4790 1913775 : s->segs->t->end += cnt;
4791 : } else {
4792 34949 : if (!(s->segs->t = new_segment(s->segs->t, tr, cnt))) {
4793 : ok = LOG_ERR;
4794 : } else {
4795 34949 : if (!s->segs->h)
4796 0 : s->segs->h = s->segs->t;
4797 34949 : slot = s->segs->t->start;
4798 : }
4799 : }
4800 : }
4801 2022719 : if (!locked)
4802 1901661 : unlock_table(tr->store, t->base.id);
4803 :
4804 2022719 : if (ok == LOG_OK) {
4805 : /* hard to only add this once per transaction (probably want to change to once per new segment) */
4806 2022719 : if (!in_transaction) {
4807 48553 : trans_add_obj(tr, &t->base, s, &tc_gc_del, &commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
4808 48553 : in_transaction = true;
4809 : }
4810 2022719 : if (in_transaction && !NOT_TO_BE_LOGGED(t))
4811 2022292 : tr->logchanges += (int) cnt;
4812 2022719 : *offset = slot;
4813 : }
4814 : return ok;
4815 : }
4816 :
4817 : /*
4818 : * Claim cnt slots to store the tuples. The claim_tab should claim storage on the level
4819 : * of the global transaction and mark the newly added storage slots unused on the global
4820 : * level but used on the local transaction level. Besides this the local transaction needs
4821 : * to update (and mark unused) any slot inbetween the old end and new slots.
4822 : * */
4823 : static int
4824 1932036 : claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4825 : {
4826 1932036 : storage *s;
4827 :
4828 : /* we have a single segment structure for each persistent table
4829 : * for temporary tables each has its own */
4830 1932036 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4831 : return LOG_ERR;
4832 :
4833 1932036 : return claim_segments(tr, t, s, cnt, offset, offsets, false); /* find slot(s) */
4834 : }
4835 :
4836 : /* some tables cannot be updated concurrently (user/roles etc) */
4837 : static int
4838 121062 : key_claim_tab(sql_trans *tr, sql_table *t, size_t cnt, BUN *offset, BAT **offsets)
4839 : {
4840 121062 : storage *s;
4841 121062 : int res = 0;
4842 :
4843 : /* we have a single segment structure for each persistent table
4844 : * for temporary tables each has its own */
4845 121062 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4846 : /* TODO check for other inserts ! */
4847 : return LOG_ERR;
4848 :
4849 121062 : lock_table(tr->store, t->base.id);
4850 121062 : if ((res = segments_conflict(tr, s->segs, 1))) {
4851 4 : unlock_table(tr->store, t->base.id);
4852 4 : return LOG_CONFLICT;
4853 : }
4854 121058 : res = claim_segments(tr, t, s, cnt, offset, offsets, true); /* find slot(s) */
4855 121058 : unlock_table(tr->store, t->base.id);
4856 121058 : return res;
4857 : }
4858 :
4859 : static int
4860 14767 : tab_validate(sql_trans *tr, sql_table *t, int uncommitted)
4861 : {
4862 14767 : storage *s;
4863 14767 : int res = 0;
4864 :
4865 14767 : if ((s = bind_del_data(tr, t, NULL)) == NULL)
4866 : return LOG_ERR;
4867 :
4868 14767 : lock_table(tr->store, t->base.id);
4869 14767 : res = segments_conflict(tr, s->segs, uncommitted);
4870 14767 : unlock_table(tr->store, t->base.id);
4871 14767 : return res ? LOG_CONFLICT : LOG_OK;
4872 : }
4873 :
4874 : static size_t
4875 1353368 : has_deletes_in_range( segment *s, sql_trans *tr, BUN start, BUN end)
4876 : {
4877 1353368 : size_t cnt = 0;
4878 :
4879 1434552 : for(;s && s->end <= start; s = ATOMIC_PTR_GET(&s->next))
4880 : ;
4881 :
4882 3422360 : for(;s && s->start < end && !cnt; s = ATOMIC_PTR_GET(&s->next)) {
4883 2068991 : if (SEG_IS_DELETED(s, tr)) /* assume aligned s->end and end */
4884 121307 : cnt += s->end - s->start;
4885 : }
4886 1353369 : return cnt;
4887 : }
4888 :
4889 : static BAT *
4890 1353386 : segments2cands(storage *S, sql_trans *tr, sql_table *t, size_t start, size_t end)
4891 : {
4892 1353386 : lock_table(tr->store, t->base.id);
4893 1353369 : segment *s = S->segs->h;
4894 : /* step one no deletes -> dense range */
4895 1353369 : uint32_t cur = 0;
4896 1353369 : size_t dnr = has_deletes_in_range(s, tr, start, end), nr = end - start, pos = 0;
4897 1353369 : if (!dnr) {
4898 1247033 : unlock_table(tr->store, t->base.id);
4899 1247062 : return BATdense(start, start, end-start);
4900 : }
4901 :
4902 106336 : BAT *b = COLnew(0, TYPE_msk, nr, SYSTRANS), *bn = NULL;
4903 106336 : if (!b) {
4904 0 : unlock_table(tr->store, t->base.id);
4905 0 : return NULL;
4906 : }
4907 :
4908 106336 : uint32_t *restrict dst = Tloc(b, 0);
4909 59483111 : for( ; s; s=ATOMIC_PTR_GET(&s->next)) {
4910 59434042 : if (s->end < start)
4911 43398 : continue;
4912 59390644 : if (s->start >= end)
4913 : break;
4914 59333377 : msk m = (SEG_IS_VALID(s, tr));
4915 59333377 : size_t lnr = s->end-s->start;
4916 59333377 : if (s->start < start)
4917 6933 : lnr -= (start - s->start);
4918 59333377 : if (s->end > end)
4919 5091 : lnr -= s->end - end;
4920 :
4921 59333377 : if (m) {
4922 992246 : size_t used = pos&31, end = 32;
4923 992246 : if (used) {
4924 861824 : if (lnr < (32-used))
4925 501348 : end = used + lnr;
4926 861824 : assert(end > used);
4927 861824 : cur |= ((1U << (end - used)) - 1) << used;
4928 861824 : lnr -= end - used;
4929 861824 : pos += end - used;
4930 861824 : if (end == 32) {
4931 360476 : *dst++ = cur;
4932 360476 : cur = 0;
4933 : }
4934 : }
4935 992246 : size_t full = lnr/32;
4936 992246 : size_t rest = lnr%32;
4937 992246 : if (full > 0) {
4938 279420 : memset(dst, ~0, full * sizeof(*dst));
4939 279420 : dst += full;
4940 279420 : lnr -= full * 32;
4941 279420 : pos += full * 32;
4942 : }
4943 992246 : if (rest > 0) {
4944 460075 : cur |= (1U << rest) - 1;
4945 460075 : lnr -= rest;
4946 460075 : pos += rest;
4947 : }
4948 992246 : assert(lnr==0);
4949 : } else {
4950 58341131 : size_t used = pos&31, end = 32;
4951 58341131 : if (used) {
4952 56506046 : if (lnr < (32-used))
4953 54589916 : end = used + lnr;
4954 :
4955 56506046 : pos+= (end-used);
4956 56506046 : lnr-= (end-used);
4957 56506046 : if (end == 32) {
4958 1916130 : *dst++ = cur;
4959 1916130 : cur = 0;
4960 : }
4961 : }
4962 58341131 : size_t full = lnr/32;
4963 58341131 : size_t rest = lnr%32;
4964 58341131 : memset(dst, 0, full * sizeof(*dst));
4965 58341131 : dst += full;
4966 58341131 : lnr -= full * 32;
4967 58341131 : pos += full * 32;
4968 58341131 : pos+= rest;
4969 58341131 : lnr-= rest;
4970 58341131 : assert(lnr==0);
4971 : }
4972 : }
4973 :
4974 106336 : unlock_table(tr->store, t->base.id);
4975 106336 : if (pos%32)
4976 101916 : *dst=cur;
4977 106336 : BATsetcount(b, nr);
4978 106335 : bn = BATmaskedcands(start, nr, b, true);
4979 106336 : BBPreclaim(b);
4980 106336 : (void)pos;
4981 106336 : assert (pos == nr);
4982 : return bn;
4983 : }
4984 :
4985 : static void * /* BAT * */
4986 1360645 : bind_cands(sql_trans *tr, sql_table *t, int nr_of_parts, int part_nr)
4987 : {
4988 : /* with nr_of_parts - part_nr we can adjust parts */
4989 1360645 : storage *s = tab_timestamp_storage(tr, t);
4990 :
4991 1360634 : if (!s)
4992 : return NULL;
4993 1360634 : size_t nr = segs_end(s->segs, tr, t);
4994 :
4995 1360623 : if (!nr)
4996 7237 : return BATdense(0, 0, 0);
4997 :
4998 : /* compute proper part */
4999 1353386 : size_t part_size = nr/nr_of_parts;
5000 1353386 : size_t start = part_size * part_nr;
5001 1353386 : size_t end = start + part_size;
5002 1353386 : if (part_nr == (nr_of_parts-1))
5003 1283777 : end = nr;
5004 1353386 : assert(end <= nr);
5005 1353386 : return segments2cands(s, tr, t, start, end);
5006 : }
5007 :
5008 : static int
5009 1 : swap_bats(sql_trans *tr, sql_column *col, BAT *bn)
5010 : {
5011 1 : bool update_conflict = false;
5012 :
5013 1 : if (segments_in_transaction(tr, col->t))
5014 : return LOG_CONFLICT;
5015 :
5016 1 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5017 :
5018 1 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5019 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5020 1 : assert(d && d->cs.ts == tr->tid);
5021 1 : if (odelta != d)
5022 1 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t)?NULL:&log_update_col);
5023 1 : if (d->cs.bid)
5024 1 : temp_destroy(d->cs.bid);
5025 1 : if (d->cs.uibid)
5026 1 : temp_destroy(d->cs.uibid);
5027 1 : if (d->cs.uvbid)
5028 1 : temp_destroy(d->cs.uvbid);
5029 1 : bat_set_access(bn, BAT_READ);
5030 1 : d->cs.bid = temp_create(bn);
5031 1 : d->cs.uibid = 0;
5032 1 : d->cs.uvbid = 0;
5033 1 : d->cs.ucnt = 0;
5034 1 : d->cs.cleared = true;
5035 1 : d->cs.ts = tr->tid;
5036 1 : ATOMIC_INIT(&d->cs.refcnt, 1);
5037 1 : return LOG_OK;
5038 : }
5039 :
5040 : static int
5041 58 : col_compress(sql_trans *tr, sql_column *col, storage_type st, BAT *o, BAT *u)
5042 : {
5043 58 : bool update_conflict = false;
5044 :
5045 58 : if (segments_in_transaction(tr, col->t))
5046 : return LOG_CONFLICT;
5047 :
5048 58 : sql_delta *d = NULL, *odelta = ATOMIC_PTR_GET(&col->data);
5049 :
5050 58 : if ((d = bind_col_data(tr, col, &update_conflict)) == NULL)
5051 0 : return update_conflict ? LOG_CONFLICT : LOG_ERR;
5052 58 : assert(d && d->cs.ts == tr->tid);
5053 58 : assert(col->t->persistence != SQL_DECLARED_TABLE);
5054 58 : if (odelta != d)
5055 58 : trans_add_obj(tr, &col->base, d, &tc_gc_col, &commit_update_col, NOT_TO_BE_LOGGED(col->t) ? NULL : &log_update_col);
5056 :
5057 58 : d->cs.st = st;
5058 58 : d->cs.cleared = true;
5059 58 : if (d->cs.bid)
5060 58 : temp_destroy(d->cs.bid);
5061 58 : o = transfer_to_systrans(o);
5062 58 : if (o == NULL)
5063 : return LOG_ERR;
5064 58 : bat_set_access(o, BAT_READ);
5065 58 : d->cs.bid = temp_create(o);
5066 58 : if (u) {
5067 53 : if (d->cs.ebid)
5068 0 : temp_destroy(d->cs.ebid);
5069 53 : u = transfer_to_systrans(u);
5070 53 : if (u == NULL)
5071 : return LOG_ERR;
5072 53 : d->cs.ebid = temp_create(u);
5073 : }
5074 : return LOG_OK;
5075 : }
5076 :
5077 : void
5078 341 : bat_storage_init( store_functions *sf)
5079 : {
5080 341 : sf->bind_col = &bind_col;
5081 341 : sf->bind_updates = &bind_updates;
5082 341 : sf->bind_updates_idx = &bind_updates_idx;
5083 341 : sf->bind_idx = &bind_idx;
5084 341 : sf->bind_cands = &bind_cands;
5085 :
5086 341 : sf->claim_tab = &claim_tab;
5087 341 : sf->key_claim_tab = &key_claim_tab;
5088 341 : sf->tab_validate = &tab_validate;
5089 :
5090 341 : sf->append_col = &append_col;
5091 341 : sf->append_idx = &append_idx;
5092 :
5093 341 : sf->update_col = &update_col;
5094 341 : sf->update_idx = &update_idx;
5095 :
5096 341 : sf->delete_tab = &delete_tab;
5097 :
5098 341 : sf->count_del = &count_del;
5099 341 : sf->count_col = &count_col;
5100 341 : sf->count_idx = &count_idx;
5101 341 : sf->dcount_col = &dcount_col;
5102 341 : sf->min_max_col = &min_max_col;
5103 341 : sf->set_stats_col = &set_stats_col;
5104 341 : sf->sorted_col = &sorted_col;
5105 341 : sf->unique_col = &unique_col;
5106 341 : sf->double_elim_col = &double_elim_col;
5107 341 : sf->col_stats = &col_stats;
5108 341 : sf->col_set_range = &col_set_range;
5109 341 : sf->col_not_null = &col_not_null;
5110 :
5111 341 : sf->col_dup = &col_dup;
5112 341 : sf->idx_dup = &idx_dup;
5113 341 : sf->del_dup = &del_dup;
5114 :
5115 341 : sf->create_col = &create_col; /* create and add to change list */
5116 341 : sf->create_idx = &create_idx;
5117 341 : sf->create_del = &create_del;
5118 :
5119 341 : sf->destroy_col = &destroy_col; /* free resources */
5120 341 : sf->destroy_idx = &destroy_idx;
5121 341 : sf->destroy_del = &destroy_del;
5122 :
5123 341 : sf->drop_col = &drop_col; /* add drop to change list */
5124 341 : sf->drop_idx = &drop_idx;
5125 341 : sf->drop_del = &drop_del;
5126 :
5127 341 : sf->clear_table = &clear_table;
5128 :
5129 341 : sf->swap_bats = &swap_bats;
5130 341 : sf->col_compress = &col_compress;
5131 341 : }
5132 :
5133 : #if 0
5134 : static lng
5135 : log_get_nr_inserted(sql_column *fc, lng *offset)
5136 : {
5137 : lng cnt = 0;
5138 :
5139 : if (!fc || GDKinmemory(0))
5140 : return 0;
5141 :
5142 : if (fc->base.atime && fc->base.allocated) {
5143 : sql_delta *fb = fc->data;
5144 : BAT *ins = temp_descriptor(fb->cs.bid);
5145 :
5146 : if (ins && BATcount(ins) > 0 && BATcount(ins) > ins->batInserted) {
5147 : cnt = BATcount(ins) - ins->batInserted;
5148 : }
5149 : bat_destroy(ins);
5150 : }
5151 : return cnt;
5152 : }
5153 :
5154 : static lng
5155 : log_get_nr_deleted(sql_table *ft, lng *offset)
5156 : {
5157 : lng cnt = 0;
5158 :
5159 : if (!ft || GDKinmemory(0))
5160 : return 0;
5161 :
5162 : if (ft->base.atime && ft->base.allocated) {
5163 : storage *fdb = ft->data;
5164 : BAT *db = temp_descriptor(fdb->cs.bid);
5165 :
5166 : if (db && BATcount(db) > 0 && BATcount(db) > db->batInserted) {
5167 : cnt = BATcount(db) - db->batInserted;
5168 : *offset = db->batInserted;
5169 : }
5170 : bat_destroy(db);
5171 : }
5172 : return cnt;
5173 : }
5174 : #endif
|