changeset 86188:ec241b102d85 smart-merge-jan22

Merge with Jan2022.
author Aris Koning <aris.koning@monetdbsolutions.com>
date Thu, 28 Jul 2022 16:38:54 +0200
parents 51f88573403f (diff) aec3dfc4d53b (current diff)
children
files sql/storage/store.c
diffstat 6 files changed, 402 insertions(+), 49 deletions(-) [+]
line wrap: on
line diff
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -233,7 +233,7 @@ struct os_iter {
 /* transaction changes */
 typedef int (*tc_valid_fptr) (struct sql_trans *tr, struct sql_change *c/*, ulng commit_ts, ulng oldest*/);
 typedef int (*tc_log_fptr) (struct sql_trans *tr, struct sql_change *c);								/* write changes to the log */
-typedef int (*tc_commit_fptr) (struct sql_trans *tr, struct sql_change *c, ulng commit_ts, ulng oldest);/* commit/rollback changes */
+typedef int (*tc_commit_fptr) (struct sql_trans *tr, struct sql_change *c, ulng commit_ts, ulng oldest, ulng *active);/* commit/rollback changes */
 typedef int (*tc_cleanup_fptr) (sql_store store, struct sql_change *c, ulng oldest);	/* garbage collection, ie cleanup structures when possible */
 typedef void (*destroy_fptr)(sql_store store, sql_base *b);
 typedef int (*validate_fptr)(struct sql_trans *tr, sql_base *b, int delete);
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -19,15 +19,15 @@
 static int log_update_col( sql_trans *tr, sql_change *c);
 static int log_update_idx( sql_trans *tr, sql_change *c);
 static int log_update_del( sql_trans *tr, sql_change *c);
-static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
-static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
-static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest);
+static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest, ulng *active);
+static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest, ulng *active);
+static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts, ulng oldest, ulng *active);
 static int log_create_col(sql_trans *tr, sql_change *change);
 static int log_create_idx(sql_trans *tr, sql_change *change);
 static int log_create_del(sql_trans *tr, sql_change *change);
-static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
-static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
-static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest);
+static int commit_create_col(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active);
+static int commit_create_idx(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active);
+static int commit_create_del(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active);
 static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
 static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
 static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
@@ -389,7 +389,7 @@ segments2cs(sql_trans *tr, segments *seg
 
 /* TODO return LOG_OK/ERR */
 static void
-merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
+merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active)
 {
 	segment *cur = s->segs->h, *seg = NULL;
 	for (; cur; cur = cur->next) {
@@ -398,24 +398,45 @@ merge_segments(storage *s, sql_trans *tr
 				cur->oldts = 0;
 			cur->ts = commit_ts;
 		}
-		if (cur->ts <= oldest && cur->ts < TRANSACTION_ID_BASE) { /* possibly merge range */
-			if (!seg) { /* skip first */
-				seg = cur;
-			} else if (seg->end == cur->start && seg->deleted == cur->deleted) {
-				/* merge with previous */
-				seg->end = cur->end;
-				seg->next = cur->next;
-				if (cur == s->segs->t)
-					s->segs->t = seg;
-				if (commit_ts == oldest)
-					_DELETE(cur);
-				else
-					mark4destroy(cur, change, commit_ts);
-				cur = seg;
-			} else {
-				seg = cur; /* begin of new merge */
+		if (!seg) {
+			/* first segment */
+			seg = cur;
+		}
+		else if (seg->ts < TRANSACTION_ID_BASE) {
+			/* possible merge since both deleted flags are equal */
+			if (seg->deleted == cur->deleted && cur->ts < TRANSACTION_ID_BASE) {
+				int merge = 1;
+				for (int i = 0; active[i] != 0; i++) {
+					assert(active[i] != seg->ts && active[i] != cur->ts);
+
+					if (active[i] == tr->ts)
+						continue; /* pretent that committing transaction has already committed and is no longer active */
+					if (seg->ts < active[i] && cur->ts < active[i])
+						break;
+					if (seg->ts > active[i] && cur->ts > active[i])
+						continue;
+
+					assert((active[i] > seg->ts && active[i] < cur->ts) || (active[i] < seg->ts && active[i] > cur->ts));
+					/* cannot safely merge since there is an active transaction between the segments */
+					merge = false;
+					break;
+				}
+				/* merge segments */
+				if (merge) {
+					seg->end = cur->end;
+					seg->next = cur->next;
+					if (cur == s->segs->t)
+						s->segs->t = seg;
+					if (commit_ts == oldest)
+						_DELETE(cur);
+					else
+						mark4destroy(cur, change, commit_ts);
+					cur = seg;
+					continue;
+				}
 			}
 		}
+		seg = cur;
 	}
 }
 
@@ -3015,10 +3036,11 @@ log_create_col(sql_trans *tr, sql_change
 }
 
 static int
-commit_create_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest)
+commit_create_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest, ulng *active)
 {
 	int ok = LOG_OK;
 	(void)oldest;
+	(void)active;
 
 	if(!isTempTable(c->t)) {
 		sql_delta *delta = ATOMIC_PTR_GET(&c->data);
@@ -3036,12 +3058,12 @@ commit_create_col_( sql_trans *tr, sql_c
 }
 
 static int
-commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
+commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active)
 {
 	sql_column *c = (sql_column*)change->obj;
 	if (!tr->parent)
 		c->base.new = 0;
-	return commit_create_col_( tr, c, commit_ts, oldest);
+	return commit_create_col_( tr, c, commit_ts, oldest, active);
 }
 
 /* will be called for new idx's and when new index columns are created */
@@ -3121,10 +3143,11 @@ log_create_idx(sql_trans *tr, sql_change
 }
 
 static int
-commit_create_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest)
+commit_create_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest, ulng *active)
 {
 	int ok = LOG_OK;
 	(void)oldest;
+	(void)active;
 
 	if(!isTempTable(i->t)) {
 		sql_delta *delta = ATOMIC_PTR_GET(&i->data);
@@ -3141,12 +3164,12 @@ commit_create_idx_( sql_trans *tr, sql_i
 }
 
 static int
-commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
+commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active)
 {
 	sql_idx *i = (sql_idx*)change->obj;
 	if (!tr->parent)
 		i->base.new = 0;
-	return commit_create_idx_(tr, i, commit_ts, oldest);
+	return commit_create_idx_(tr, i, commit_ts, oldest, active);
 }
 
 static int
@@ -3358,7 +3381,7 @@ log_create_del(sql_trans *tr, sql_change
 }
 
 static int
-commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
+commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active)
 {
 	int ok = LOG_OK;
 	sql_table *t = (sql_table*)change->obj;
@@ -3371,21 +3394,21 @@ commit_create_del( sql_trans *tr, sql_ch
 		assert(ok == LOG_OK);
 		if (ok != LOG_OK)
 			return ok;
-		merge_segments(dbat, tr, change, commit_ts, commit_ts/* create is we are alone */ /*oldest*/);
+		merge_segments(dbat, tr, change, commit_ts, commit_ts, active);
 		assert(dbat->cs.ts == tr->tid);
 		dbat->cs.ts = commit_ts;
 		if (ok == LOG_OK) {
 			for(node *n = ol_first_node(t->columns); n && ok == LOG_OK; n = n->next) {
 				sql_column *c = n->data;
 
-				ok = commit_create_col_(tr, c, commit_ts, oldest);
+				ok = commit_create_col_(tr, c, commit_ts, oldest, active);
 			}
 			if (t->idxs) {
 				for(node *n = ol_first_node(t->idxs); n && ok == LOG_OK; n = n->next) {
 					sql_idx *i = n->data;
 
 					if (ATOMIC_PTR_GET(&i->data))
-						ok = commit_create_idx_(tr, i, commit_ts, oldest);
+						ok = commit_create_idx_(tr, i, commit_ts, oldest, active);
 				}
 			}
 			if (!tr->parent)
@@ -3523,12 +3546,13 @@ log_destroy_del(sql_trans *tr, sql_chang
 }
 
 static int
-commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
+commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active)
 {
 	(void)tr;
 	(void)change;
 	(void)commit_ts;
 	(void)oldest;
+	(void)active;
 	return 0;
 }
 
@@ -3974,12 +3998,13 @@ log_update_col( sql_trans *tr, sql_chang
 }
 
 static int
-commit_update_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest)
+commit_update_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest, ulng *active)
 {
 	int ok = LOG_OK;
 	sql_delta *delta = ATOMIC_PTR_GET(&c->data);
 
 	(void)oldest;
+	(void)active;
 	if (isTempTable(c->t)) {
 		if (commit_ts) { /* commit */
 			if (c->t->commit_action == CA_COMMIT || c->t->commit_action == CA_PRESERVE) {
@@ -4033,14 +4058,14 @@ tc_gc_rollbacked_storage( sql_store Stor
 
 
 static int
-commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
+commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active)
 {
 	int ok = LOG_OK;
 	sql_column *c = (sql_column*)change->obj;
 	sql_delta *delta = ATOMIC_PTR_GET(&c->data);
 
 	if (isTempTable(c->t))
-		return commit_update_col_(tr, c, commit_ts, oldest);
+		return commit_update_col_(tr, c, commit_ts, oldest, active);
 	if (commit_ts)
 		delta->cs.ts = commit_ts;
 	if (!commit_ts) { /* rollback */
@@ -4085,13 +4110,14 @@ log_update_idx( sql_trans *tr, sql_chang
 }
 
 static int
-commit_update_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest)
+commit_update_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest, ulng *active)
 {
 	int ok = LOG_OK;
 	sql_delta *delta = ATOMIC_PTR_GET(&i->data);
 	int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
 
 	(void)oldest;
+	(void)active;
 	if (isTempTable(i->t)) {
 		if (commit_ts) { /* commit */
 			if (i->t->commit_action == CA_COMMIT || i->t->commit_action == CA_PRESERVE) {
@@ -4114,14 +4140,14 @@ commit_update_idx_( sql_trans *tr, sql_i
 }
 
 static int
-commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
+commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active)
 {
 	int ok = LOG_OK;
 	sql_idx *i = (sql_idx*)change->obj;
 	sql_delta *delta = ATOMIC_PTR_GET(&i->data);
 
 	if (isTempTable(i->t))
-		return commit_update_idx_( tr, i, commit_ts, oldest);
+		return commit_update_idx_( tr, i, commit_ts, oldest, active);
 	if (commit_ts)
 		delta->cs.ts = commit_ts;
 	if (!commit_ts) { /* rollback */
@@ -4198,7 +4224,7 @@ commit_storage(sql_trans *tr, storage *d
 }
 
 static int
-commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
+commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active)
 {
 	int ok = LOG_OK;
 	sql_table *t = (sql_table*)change->obj;
@@ -4251,11 +4277,11 @@ commit_update_del( sql_trans *tr, sql_ch
 		ok = segments2cs(tr, dbat->segs, &dbat->cs);
 		assert(ok == LOG_OK);
 		if (ok == LOG_OK)
-			merge_segments(dbat, tr, change, commit_ts, oldest);
+			merge_segments(dbat, tr, change, commit_ts, oldest, active);
 		if (ok == LOG_OK && dbat == d && oldest == commit_ts)
 			ok = merge_storage(dbat);
 	} else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
-		merge_segments(dbat, tr, change, commit_ts, oldest);
+		merge_segments(dbat, tr, change, commit_ts, oldest, active);
 		ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat, commit_ts));
 	}
 	unlock_table(tr->store, t->base.id);
--- a/sql/storage/objectset.c
+++ b/sql/storage/objectset.c
@@ -607,7 +607,7 @@ tc_gc_objectversion(sql_store store, sql
 }
 
 static int
-tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest)
+tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts, ulng oldest, ulng *active_)
 {
 	objectversion *ov = (objectversion*)change->data;
 	if (commit_ts) {
@@ -615,6 +615,7 @@ tc_commit_objectversion(sql_trans *tr, s
 		ov->ts = commit_ts;
 		change->committed = commit_ts < TRANSACTION_ID_BASE ? true: false;
 		(void)oldest;
+		(void)active_;
 		if (!tr->parent)
 			change->obj->new = 0;
 	}
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -57,6 +57,19 @@ store_oldest(sqlstore *store)
 	return store->oldest;
 }
 
+static ulng *
+store_get_active(sqlstore *store)
+{
+
+	ulng *active = GDKmalloc(sizeof(ulng) * (store->active->cnt + 1));
+	node *cur = store->active->h;
+	for (int i = 0; i < store->active->cnt; i++, cur = cur->next) {
+		active[i] = ((sql_session*)cur->data)->tr->ts;
+	}
+	active[store->active->cnt] = 0;
+	return active;
+}
+
 static ulng
 store_oldest_pending(sqlstore *store)
 {
@@ -3624,13 +3637,16 @@ sql_trans_rollback(sql_trans *tr, bool c
 		store_lock(store);
 		ulng oldest = store_oldest(store);
 		ulng commit_ts = store_get_timestamp(store); /* use most recent timestamp such that we can cleanup savely */
+		ulng *active = store_get_active(store); /* get active transactions (to merge segments) */
 		for(node *n=nl->h; n; n = n->next) {
 			sql_change *c = n->data;
 
-			if (c->commit)
-				c->commit(tr, c, 0 /* ie rollback */, oldest);
+			if (c->commit) {
+				c->commit(tr, c, 0 /* ie rollback */, oldest, active);
+			}
 			c->ts = commit_ts;
 		}
+		GDKfree(active);
 		store_pending_changes(store, oldest);
 		for(node *n=nl->h; n; n = n->next) {
 			sql_change *c = n->data;
@@ -3974,15 +3990,18 @@ sql_trans_commit(sql_trans *tr)
 		if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent)
 			oldest = commit_ts;
 		store_pending_changes(store, oldest);
+		ulng *active = store_get_active(store); /* get active transactions (to merge segments) */
 		for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) {
 			sql_change *c = n->data;
 
-			if (c->commit && ok == LOG_OK)
-				ok = c->commit(tr, c, commit_ts, oldest);
+			if (c->commit && ok == LOG_OK) {
+				ok = c->commit(tr, c, commit_ts, oldest, active);
+			}
 			else
 				c->obj->new = 0;
 			c->ts = commit_ts;
 		}
+		GDKfree(active);
 		/* when directly flushing: flush logger after changes got applied */
 		if (flush) {
 			if (ok == LOG_OK) {
--- a/sql/test/concurrent/Tests/All
+++ b/sql/test/concurrent/Tests/All
@@ -1,2 +1,3 @@
 simple_select
 crash_on_concurrent_use.SF-1411926
+smart-segment-merge
new file mode 100644
--- /dev/null
+++ b/sql/test/concurrent/Tests/smart-segment-merge.test
@@ -0,0 +1,306 @@
+# INIT
+
+@connection(id=1, username=monetdb, password=monetdb)
+statement ok
+CREATE TABLE Test (k int);
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+1
+
+@connection(id=1)
+statement ok
+INSERT INTO Test SELECT value FROM generate_series(1, 11);
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+1
+
+
+# TEST INSERTS
+# the four uncommitted transactions create four new segments, appended to the tail of the segment list;
+# the first commit (id=1) does not merge any segments since active transactions prevent it;
+# the second commit (id=3) does not merge any segments since there is an uncommitted segment of id=2 between id=1 and id=3;
+# the third commit (id=4) is able to merge its segment plus the segment inserted by the transaction id=3,
+#  as they are next to each other and the active transaction is not able to read them;
+# the fourth commit is able to merge everything, as there are no active transactions and all segments are contiguous.
+
+@connection(id=1)
+statement ok
+begin transaction
+
+@connection(id=2, username=monetdb, password=monetdb)
+statement ok
+begin transaction
+
+@connection(id=3, username=monetdb, password=monetdb)
+statement ok
+begin transaction
+
+@connection(id=4, username=monetdb, password=monetdb)
+statement ok
+begin transaction
+
+@connection(id=1)
+statement ok
+INSERT INTO Test VALUES (11);
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+2
+
+@connection(id=2)
+statement ok
+INSERT INTO Test VALUES (12);
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+3
+
+@connection(id=3)
+statement ok
+INSERT INTO Test VALUES (13), (14);
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+4
+
+@connection(id=4)
+statement ok
+INSERT INTO Test VALUES (15);
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+5
+
+@connection(id=1)
+statement ok
+commit
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+5
+
+@connection(id=3)
+statement ok
+commit
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+5
+
+@connection(id=4)
+statement ok
+commit
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+4
+
+@connection(id=1)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,5,6,7,8,9,10,11,13,14,15
+
+@connection(id=2)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,5,6,7,8,9,10,12
+
+@connection(id=2)
+statement ok
+commit
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+1
+
+@connection(id=1)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
+
+
+# TEST DELETES
+# the first transaction is read only;
+# the second transaction deletes one row, splitting the segment;
+# the third transaction deletes another row, creating another split, but cannot merge as it is not contiguous with the second tx;
+# the fourth transaction deletes another row between the second and third txs, merging the segments.
+
+@connection(id=1)
+statement ok
+begin transaction
+
+@connection(id=2)
+statement ok
+DELETE FROM Test WHERE k = 5
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+3
+
+@connection(id=2)
+statement ok
+DELETE FROM Test WHERE k = 7
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+5
+
+@connection(id=1)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
+
+@connection(id=2)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,6,8,9,10,11,12,13,14,15
+
+@connection(id=2)
+statement ok
+DELETE FROM Test WHERE k = 6
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+3
+
+@connection(id=1)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,5,6,7,8,9,10,11,12,13,14,15
+
+@connection(id=2)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,8,9,10,11,12,13,14,15
+
+@connection(id=1)
+statement ok
+commit
+
+@connection(id=1)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,8,9,10,11,12,13,14,15
+
+
+# TEST FILLING HOLES
+# there are currently 3 segments, with the middle one referring to three deleted rows;
+# the first transaction is read-only;
+# the second transaction inserts a new row, which splits the deleted segment into two;
+#  it is not merged with the first segment due to the active transaction;
+# the third transaction inserts a new row, splitting the delete segment into another two;
+#  however, the two new inserted segments are merged, as they are contiguous and the active transactions
+#  cannot read them;
+# the first transaction commits but the segments remain the same since it is read-only;
+# the last transaction inserts another row and commits, merging everything back into one segment.
+
+@connection(id=1)
+statement ok
+begin transaction
+
+@connection(id=2)
+statement ok
+INSERT INTO TEST VALUES (16);
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+4
+
+@connection(id=2)
+statement ok
+INSERT INTO TEST VALUES (17);
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+4
+
+@connection(id=1)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,8,9,10,11,12,13,14,15
+
+@connection(id=2)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,16,17,8,9,10,11,12,13,14,15
+
+@connection(id=1)
+statement ok
+commit
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+4
+
+@connection(id=1)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,16,17,8,9,10,11,12,13,14,15
+
+@connection(id=2)
+statement ok
+INSERT INTO TEST VALUES (18);
+
+@connection(id=1)
+query I
+SELECT segments FROM sys.deltas('sys', 'test');
+----
+1
+
+@connection(id=1)
+query T
+SELECT listagg(k) FROM test;
+----
+1,2,3,4,16,17,18,8,9,10,11,12,13,14,15
+
+
+# CLEANUP
+
+@connection(id=1)
+statement ok
+DROP TABLE Test;