Mercurial > hg > MonetDB
changeset 86188:ec241b102d85 smart-merge-jan22
Merge with Jan2022.
| author | Aris Koning <aris.koning@monetdbsolutions.com> |
|---|---|
| date | Thu, 28 Jul 2022 16:38:54 +0200 |
| parents | 51f88573403f (diff) aec3dfc4d53b (current diff) |
| children | |
| files | sql/storage/store.c |
| diffstat | 6 files changed, 402 insertions(+), 49 deletions(-) [+] |
line wrap: on
line diff
--- a/sql/include/sql_catalog.h +++ b/sql/include/sql_catalog.h @@ -233,7 +233,7 @@ struct os_iter { /* transaction changes */ typedef int (*tc_valid_fptr) (struct sql_trans *tr, struct sql_change *c/*, ulng commit_ts, ulng oldest*/); typedef int (*tc_log_fptr) (struct sql_trans *tr, struct sql_change *c); /* write changes to the log */ -typedef int (*tc_commit_fptr) (struct sql_trans *tr, struct sql_change *c, ulng commit_ts, ulng oldest);/* commit/rollback changes */ +typedef int (*tc_commit_fptr) (struct sql_trans *tr, struct sql_change *c, ulng commit_ts, ulng oldest, ulng *active);/* commit/rollback changes */ typedef int (*tc_cleanup_fptr) (sql_store store, struct sql_change *c, ulng oldest); /* garbage collection, ie cleanup structures when possible */ typedef void (*destroy_fptr)(sql_store store, sql_base *b); typedef int (*validate_fptr)(struct sql_trans *tr, sql_base *b, int delete);
--- a/sql/storage/bat/bat_storage.c +++ b/sql/storage/bat/bat_storage.c @@ -19,15 +19,15 @@ static int log_update_col( sql_trans *tr, sql_change *c); static int log_update_idx( sql_trans *tr, sql_change *c); static int log_update_del( sql_trans *tr, sql_change *c); -static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest); -static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest); -static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest); +static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest, ulng *active); +static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest, ulng *active); +static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest, ulng *active); static int log_create_col(sql_trans *tr, sql_change *change); static int log_create_idx(sql_trans *tr, sql_change *change); static int log_create_del(sql_trans *tr, sql_change *change); -static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest); -static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest); -static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest); +static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active); +static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active); +static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active); static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest); static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest); static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest); @@ -389,7 +389,7 @@ segments2cs(sql_trans *tr, segments *seg /* TODO return LOG_OK/ERR */ static void -merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active) { segment *cur = s->segs->h, *seg = NULL; for (; cur; cur = cur->next) { @@ -398,24 +398,45 @@ merge_segments(storage *s, sql_trans *tr cur->oldts = 0; cur->ts = commit_ts; } - if (cur->ts <= oldest && cur->ts < TRANSACTION_ID_BASE) { /* possibly merge range */ - if (!seg) { /* skip first */ - seg = cur; - } else if (seg->end == cur->start && seg->deleted == cur->deleted) { - /* merge with previous */ - seg->end = cur->end; - seg->next = cur->next; - if (cur == s->segs->t) - s->segs->t = seg; - if (commit_ts == oldest) - _DELETE(cur); - else - mark4destroy(cur, change, commit_ts); - cur = seg; - } else { - seg = cur; /* begin of new merge */ + if (!seg) { + /* first segment */ + seg = cur; + } + else if (seg->ts < TRANSACTION_ID_BASE) { + /* possible merge since both deleted flags are equal */ + if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) { + int merge = 1; + for (int i = 0; active[i] != 0; i++) { + assert(active[i] != seg->ts && active[i] != cur->ts); + + if (active[i] == tr->ts) + continue; /* pretent that committing transaction has already committed and is no longer active */ + if (seg->ts < active[i] && cur->ts < active[i]) + break; + if (seg->ts > active[i] && cur->ts > active[i]) + continue; + + assert((active[i] > seg->ts && active[i] < cur->ts) || (active[i] < seg->ts && active[i] > cur->ts)); + /* cannot safely merge since there is an active transaction between the segments */ + merge = false; + break; + } + /* merge segments */ + if (merge) { + seg->end = cur->end; + seg->next = cur->next; + if (cur == s->segs->t) + s->segs->t = seg; + if (commit_ts == oldest) + _DELETE(cur); + else + mark4destroy(cur, change, commit_ts); + cur = seg; + continue; + } } } + seg = cur; } } @@ -3015,10 +3036,11 @@ log_create_col(sql_trans *tr, sql_change } static int -commit_create_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest) +commit_create_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest, ulng *active) { int ok = LOG_OK; (void)oldest; + (void)active; if(!isTempTable(c->t)) { sql_delta *delta = ATOMIC_PTR_GET(&c->data); @@ -3036,12 +3058,12 @@ commit_create_col_( sql_trans *tr, sql_c } static int -commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active) { sql_column *c = (sql_column*)change->obj; if (!tr->parent) c->base.new = 0; - return commit_create_col_( tr, c, commit_ts, oldest); + return commit_create_col_( tr, c, commit_ts, oldest, active); } /* will be called for new idx's and when new index columns are created */ @@ -3121,10 +3143,11 @@ log_create_idx(sql_trans *tr, sql_change } static int -commit_create_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest) +commit_create_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest, ulng *active) { int ok = LOG_OK; (void)oldest; + (void)active; if(!isTempTable(i->t)) { sql_delta *delta = ATOMIC_PTR_GET(&i->data); @@ -3141,12 +3164,12 @@ commit_create_idx_( sql_trans *tr, sql_i } static int -commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active) { sql_idx *i = (sql_idx*)change->obj; if (!tr->parent) i->base.new = 0; - return commit_create_idx_(tr, i, commit_ts, oldest); + return commit_create_idx_(tr, i, commit_ts, oldest, active); } static int @@ -3358,7 +3381,7 @@ log_create_del(sql_trans *tr, sql_change } static int -commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active) { int ok = LOG_OK; sql_table *t = (sql_table*)change->obj; @@ -3371,21 +3394,21 @@ commit_create_del( sql_trans *tr, sql_ch assert(ok == LOG_OK); if (ok != LOG_OK) return ok; - merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/); + merge_segments(dbat, tr, change, commit_ts, commit_ts, active); assert(dbat->cs.ts == tr->tid); dbat->cs.ts = commit_ts; if (ok == LOG_OK) { for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) { sql_column *c = n->data; - ok = commit_create_col_(tr, c, commit_ts, oldest); + ok = commit_create_col_(tr, c, commit_ts, oldest, active); } if (t->idxs) { for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) { sql_idx *i = n->data; if (ATOMIC_PTR_GET(&i->data)) - ok = commit_create_idx_(tr, i, commit_ts, oldest); + ok = commit_create_idx_(tr, i, commit_ts, oldest, active); } } if (!tr->parent) @@ -3523,12 +3546,13 @@ log_destroy_del(sql_trans *tr, sql_chang } static int -commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active) { (void)tr; (void)change; (void)commit_ts; (void)oldest; + (void)active; return 0; } @@ -3974,12 +3998,13 @@ log_update_col( sql_trans *tr, sql_chang } static int -commit_update_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest) +commit_update_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest, ulng *active) { int ok = LOG_OK; sql_delta *delta = ATOMIC_PTR_GET(&c->data); (void)oldest; + (void)active; if (isTempTable(c->t)) { if (commit_ts) { /* commit */ if (c->t->commit_action == CA_COMMIT || c->t->commit_action == CA_PRESERVE) { @@ -4033,14 +4058,14 @@ tc_gc_rollbacked_storage( sql_store Stor static int -commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active) { int ok = LOG_OK; sql_column *c = (sql_column*)change->obj; sql_delta *delta = ATOMIC_PTR_GET(&c->data); if (isTempTable(c->t)) - return commit_update_col_(tr, c, commit_ts, oldest); + return commit_update_col_(tr, c, commit_ts, oldest, active); if (commit_ts) delta->cs.ts = commit_ts; if (!commit_ts) { /* rollback */ @@ -4085,13 +4110,14 @@ log_update_idx( sql_trans *tr, sql_chang } static int -commit_update_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest) +commit_update_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest, ulng *active) { int ok = LOG_OK; sql_delta *delta = ATOMIC_PTR_GET(&i->data); int type = (oid_index(i->type))?TYPE_oid:TYPE_lng; (void)oldest; + (void)active; if (isTempTable(i->t)) { if (commit_ts) { /* commit */ if (i->t->commit_action == CA_COMMIT || i->t->commit_action == CA_PRESERVE) { @@ -4114,14 +4140,14 @@ commit_update_idx_( sql_trans *tr, sql_i } static int -commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active) { int ok = LOG_OK; sql_idx *i = (sql_idx*)change->obj; sql_delta *delta = ATOMIC_PTR_GET(&i->data); if (isTempTable(i->t)) - return commit_update_idx_( tr, i, commit_ts, oldest); + return commit_update_idx_( tr, i, commit_ts, oldest, active); if (commit_ts) delta->cs.ts = commit_ts; if (!commit_ts) { /* rollback */ @@ -4198,7 +4224,7 @@ commit_storage(sql_trans *tr, storage *d } static int -commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active) { int ok = LOG_OK; sql_table *t = (sql_table*)change->obj; @@ -4251,11 +4277,11 @@ commit_update_del( sql_trans *tr, sql_ch ok = segments2cs(tr, dbat->segs, &dbat->cs); assert(ok == LOG_OK); if (ok == LOG_OK) - merge_segments(dbat, tr, change, commit_ts, oldest); + merge_segments(dbat, tr, change, commit_ts, oldest, active); if (ok == LOG_OK && dbat == d && oldest == commit_ts) ok = merge_storage(dbat); } else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */ - merge_segments(dbat, tr, change, commit_ts, oldest); + merge_segments(dbat, tr, change, commit_ts, oldest, active); ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts)); } unlock_table(tr->store, t->base.id);
--- a/sql/storage/objectset.c +++ b/sql/storage/objectset.c @@ -607,7 +607,7 @@ tc_gc_objectversion(sql_store store, sql } static int -tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest) +tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active_) { objectversion *ov = (objectversion*)change->data; if (commit_ts) { @@ -615,6 +615,7 @@ tc_commit_objectversion(sql_trans *tr, s ov->ts = commit_ts; change->committed = commit_ts < TRANSACTION_ID_BASE ? true: false; (void)oldest; + (void)active_; if (!tr->parent) change->obj->new = 0; }
--- a/sql/storage/store.c +++ b/sql/storage/store.c @@ -57,6 +57,19 @@ store_oldest(sqlstore *store) return store->oldest; } +static ulng * +store_get_active(sqlstore *store) +{ + + ulng *active = GDKmalloc(sizeof(ulng) * (store->active->cnt + 1)); + node *cur = store->active->h; + for (int i = 0; i < store->active->cnt; i++, cur = cur->next) { + active[i] = ((sql_session*)cur->data)->tr->ts; + } + active[store->active->cnt] = 0; + return active; +} + static ulng store_oldest_pending(sqlstore *store) { @@ -3624,13 +3637,16 @@ sql_trans_rollback(sql_trans *tr, bool c store_lock(store); ulng oldest = store_oldest(store); ulng commit_ts = store_get_timestamp(store); /* use most recent timestamp such that we can cleanup savely */ + ulng *active = store_get_active(store); /* get active transactions (to merge segments) */ for(node *n=nl->h; n; n = n->next) { sql_change *c = n->data; - if (c->commit) - c->commit(tr, c, 0 /* ie rollback */, oldest); + if (c->commit) { + c->commit(tr, c, 0 /* ie rollback */, oldest, active); + } c->ts = commit_ts; } + GDKfree(active); store_pending_changes(store, oldest); for(node *n=nl->h; n; n = n->next) { sql_change *c = n->data; @@ -3974,15 +3990,18 @@ sql_trans_commit(sql_trans *tr) if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent) oldest = commit_ts; store_pending_changes(store, oldest); + ulng *active = store_get_active(store); /* get active transactions (to merge segments) */ for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) { sql_change *c = n->data; - if (c->commit && ok == LOG_OK) - ok = c->commit(tr, c, commit_ts, oldest); + if (c->commit && ok == LOG_OK) { + ok = c->commit(tr, c, commit_ts, oldest, active); + } else c->obj->new = 0; c->ts = commit_ts; } + GDKfree(active); /* when directly flushing: flush logger after changes got applied */ if (flush) { if (ok == LOG_OK) {
--- a/sql/test/concurrent/Tests/All +++ b/sql/test/concurrent/Tests/All @@ -1,2 +1,3 @@ simple_select crash_on_concurrent_use.SF-1411926 +smart-segment-merge
new file mode 100644 --- /dev/null +++ b/sql/test/concurrent/Tests/smart-segment-merge.test @@ -0,0 +1,306 @@ +# INIT + +@connection(id=1, username=monetdb, password=monetdb) +statement ok +CREATE TABLE Test (k int); + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +1 + +@connection(id=1) +statement ok +INSERT INTO Test SELECT value FROM generate_series(1, 11); + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +1 + + +# TEST INSERTS +# the four uncommitted transactions create four new segments, appended to the tail of the segment list; +# the first commit (id=1) does not merge any segments since active transactions prevent it; +# the second commit (id=3) does not merge any segments since there is an uncommitted segment of id=2 between id=1 and id=3; +# the third commit (id=4) is able to merge its segment plus the segment inserted by the transaction id=3, +# as they are next to each other and the active transaction is not able to read them; +# the fourth commit is able to merge everything, as there are no active transactions and all segments are contiguous. + +@connection(id=1) +statement ok +begin transaction + +@connection(id=2, username=monetdb, password=monetdb) +statement ok +begin transaction + +@connection(id=3, username=monetdb, password=monetdb) +statement ok +begin transaction + +@connection(id=4, username=monetdb, password=monetdb) +statement ok +begin transaction + +@connection(id=1) +statement ok +INSERT INTO Test VALUES (11); + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +2 + +@connection(id=2) +statement ok +INSERT INTO Test VALUES (12); + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +3 + +@connection(id=3) +statement ok +INSERT INTO Test VALUES (13), (14); + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +4 + +@connection(id=4) +statement ok +INSERT INTO Test VALUES (15); + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +5 + +@connection(id=1) +statement ok +commit + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +5 + +@connection(id=3) +statement ok +commit + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +5 + +@connection(id=4) +statement ok +commit + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +4 + +@connection(id=1) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,5,6,7,8,9,10,11,13,14,15 + +@connection(id=2) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,5,6,7,8,9,10,12 + +@connection(id=2) +statement ok +commit + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +1 + +@connection(id=1) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,5,6,7,8,9,10,11,12,13,14,15 + + +# TEST DELETES +# the first transaction is read only; +# the second transaction deletes one row, splitting the segment; +# the third transaction deletes another row, creating another split, but cannot merge as it is not contiguous with the second tx; +# the fourth transaction deletes another row between the second and third txs, merging the segments. + +@connection(id=1) +statement ok +begin transaction + +@connection(id=2) +statement ok +DELETE FROM Test WHERE k = 5 + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +3 + +@connection(id=2) +statement ok +DELETE FROM Test WHERE k = 7 + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +5 + +@connection(id=1) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,5,6,7,8,9,10,11,12,13,14,15 + +@connection(id=2) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,6,8,9,10,11,12,13,14,15 + +@connection(id=2) +statement ok +DELETE FROM Test WHERE k = 6 + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +3 + +@connection(id=1) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,5,6,7,8,9,10,11,12,13,14,15 + +@connection(id=2) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,8,9,10,11,12,13,14,15 + +@connection(id=1) +statement ok +commit + +@connection(id=1) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,8,9,10,11,12,13,14,15 + + +# TEST FILLING HOLES +# there are currently 3 segments, with the middle one referring to three deleted rows; +# the first transaction is read-only; +# the second transaction inserts a new row, which splits the deleted segment into two; +# it is not merged with the first segment due to the active transaction; +# the third transaction inserts a new row, splitting the delete segment into another two; +# however, the two new inserted segments are merged, as they are contiguous and the active transactions +# cannot read them; +# the first transaction commits but the segments remain the same since it is read-only; +# the last transaction inserts another row and commits, merging everything back into one segment. + +@connection(id=1) +statement ok +begin transaction + +@connection(id=2) +statement ok +INSERT INTO TEST VALUES (16); + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +4 + +@connection(id=2) +statement ok +INSERT INTO TEST VALUES (17); + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +4 + +@connection(id=1) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,8,9,10,11,12,13,14,15 + +@connection(id=2) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,16,17,8,9,10,11,12,13,14,15 + +@connection(id=1) +statement ok +commit + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +4 + +@connection(id=1) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,16,17,8,9,10,11,12,13,14,15 + +@connection(id=2) +statement ok +INSERT INTO TEST VALUES (18); + +@connection(id=1) +query I +SELECT segments FROM sys.deltas('sys', 'test'); +---- +1 + +@connection(id=1) +query T +SELECT listagg(k) FROM test; +---- +1,2,3,4,16,17,18,8,9,10,11,12,13,14,15 + + +# CLEANUP + +@connection(id=1) +statement ok +DROP TABLE Test;
