IoT and streaming in MonetDB

Internet-of-Things Database Server

With the expansion of IoT market, database systems with support for streaming data has acquired increasing attention. In an IoT scenario, data is collected in the periphery of a sensor network, assembled and aggregated by message brokers, and delivered at the doorstep of the database system. There, they are fed into a dataflow based processing pipeline to derive actionable knowledge or to become the basis for interactive data exploration. Due to the size of a sensor network and the frequency by which data can be generated, it requires a database system to be scalable and elastic.

MonetDB TimeTrails module

The MonetDB TimeTrails branch supports the analytic IoT scenarios with continuous queries. Unlike regular SQL queries, SQL queries can be packaged in an SQL procedure for continuous execution, i.e. the procedure is automatically called when a triggering condition is met. The trigger can be the arrival of sufficient new data from the stream or a signal from a heartbeat clock. In this blog we illustrate how to use the IoT facilities offered by TimeTrails.

Streaming tables

Data delivered by the message brokers in IoT scenarios often require immediate processing. In most cases, the raw data needs not end-up in the persistent store of the database. Instead, it is refined and aggregated. Moreover, modern message brokers manage the persistency of the raw data in a distributed and reliable fashion. In case of failures, they already provide methods to go back in time to start reprocessing. Redoing this work as part of a database transaction would be double work and a potential performance drain.

This leaves us with the notion of a STREAM table common to most streaming databases. They are light versions of normal relational tables, often solely kept in memory and not subjected to the transaction management. They are the end-points to deliver the IoT events. The following SQL syntax specifies how a streaming table can be created in MonetDB:

CREATE STREAM TABLE tname (... columns ...)
    [ SET [ WINDOW positive_number ][ STRIDE positive_number ] ]

The columns follow the regular definition of persistent tables. Primary Keys and Foreign Key constraints are ignored as a reaction to their violation would be ill-defined. The WINDOW and STRIDE properties determine when and how a continuous query interested in this table is triggered. The WINDOW parameter sets the number of tuples in the table to trigger a continuous query on it. If not provided (default), then any continuous query using this stream table will be triggered by an interval timer instead. The STRIDE parameter sets the number of tuples to be deleted from the stream table at the end of a continuous query invocation. The default action is to remove all tuples seen in the query invocation, otherwise the oldest N tuples are removed. Setting N to zero will keep all tuples until explicitly deletion by a continuous query. The stride size cannot be larger than the size of the window to avoid events received but never processed. The parameters can be changed later with the following SQL commands:

ALTER STREAM TABLE tname SET WINDOW positive_number
ALTER STREAM TABLE tname SET STRIDE positive_number

Continuous queries

The semantics of continuous queries are encapsulated into ordinary SQL user defined functions (UDFs) and procedures (UDPs). They only differ in the way they are called, and that they only use stream tables as input/output. Given an existing SQL UDF it can be registered with the continuous query scheduler using the command:

START CONTINUOUS { PROCEDURE | FUNCTION } fname '(' arguments ')'
    [ WITH [ HEARTBEAT positive_number ]
    [ CLOCK literal ]
    [ CYCLES positive_number ]]
    [ AS tagname ]

The scheduler is bases on a Petri-net model, which activates the execution of a continuous UDF/UDP when all its input triggers are satisfied. The HEARTBEAT parameter indicates the number of milliseconds between calls to the continuous query. If not set (default), the streaming tables used in the UDF/UDP will be scanned making it a tuple-based continuous query instead. It is not possible to set both HEARTBEAT and a WINDOW parameters at the same time, i.e. only one of the temporal and spatial conditions may be set. If neither is set, then the continuous query will be triggered in each Petri-net cycle. The CYCLES parameter tells the number of times the continuous query will be run before being removed by the Petri-net. If not indicated (default), the continuous query will run forever. The CLOCK parameter specifies the wall-clock time for the continuous query to start, otherwise it will start immediately upon registration. The ’literal’ can be a timestamp (e.g. timestamp ‘2017-08-29 15:05:40’) which sets the continuous query to start at that point, a date (e.g. date ‘2017-08-29’) on which the continuous query will start at midnight, a time value (e.g. time ‘15:05:40’) meaning that the continuous query will start today at that time, or simply an UNIX timestamp integer with millisecond precision.

The ‘tagname’ parameter is used to identify a continuous query. In this way, a SQL UDF/UDP with different arguments can be registered as different continuous queries. If a tagname is not provided, then the function/procedure name will be used instead. After having registered a continuous query, it is possible to pause/resume/stop it. Their syntax is as follows:

-- stop and remove a continuous query from the Petri-net.
STOP CONTINUOUS tagname;

-- pause a continuous query from the Petri-net but do not remove it.
PAUSE CONTINUOUS tagname;

-- resume a paused continuous query.
-- If the HEARTBEAT and CYCLES parameters are not provided (default),
-- then the previous registered values will be used.
RESUME CONTINUOUS tagname
    [ WITH [ HEARTBEAT positive_number ]
           [ CLOCK literal ]
           [ CYCLES positive_number ] ]

The following SQL commands apply to all:

  • STOP ALL CONTINUOUS - stop and remove all continuous queries from the Petri-net.

  • PAUSE ALL CONTINUOUS - pause all continuous queries in the Petri-net.

  • RESUME ALL CONTINUOUS - pesume all continuous queries in the Petri-net with the previous HEARTBEAT and CYCLES values.

During the first iteration of a continuous function, a streaming table is created under the “cquery” schema to store the outputs of the function during its lifetime in the scheduler. This streaming table will be dropped once the continuous function is deregistered from the scheduler or the MonetDB server restarts.

Implementation notes

Some implementation notes are left here:

  • All continuous queries are stopped once the MonetDB server shuts down. The user must start the continuous queries manually at restart of the server.

  • Streaming tables are volatile for better performance under large workloads. This means that upon restart of the database server their data is lost.

  • A streaming table cannot be dropped while there is a continuous query using it. The same condition holds for registered UDFs.

  • The SQL catalog properties of a streaming table including columns cannot be altered unlike regular SQL tables. The user must drop the table and recreate it with the desired changes.

  • The current scheduler implementation is agnostic of transaction management. This means that if a CQ was started, paused, resumed or stopped during a rollbacked transaction, the changes are not reverted.

  • If an error happens during a single execution, the CQ gets paused automatically. The error can be checked with a cquery.status() or cquery.log() system catalog call.

Demonstration

With the following SQL statements (taken from the test cases that come with the system), one can observe the work of TimeTrails:

CREATE STREAM TABLE inputStream (val INT) SET WINDOW 4 STRIDE 2;

-- calculate the average value of the window during execution
CREATE FUNCTION calculateAverage() RETURNS REAL BEGIN
RETURN SELECT AVG(val) as calc FROM inputStream;
END;

Start the continuous query in the background.

START CONTINUOUS FUNCTION calculateAverage() AS calcavg;

INSERT INTO inputStream VALUES (33), (29), (30), (32);

--The query was performed only once
SELECT result FROM cquery.calcavg;
result
31
INSERT INTO inputStream VALUES (30);

--The query was not performed again yet
SELECT result FROM cquery.calcavg;
result
31
INSERT INTO inputStream VALUES (33);

--The query was performed again
SELECT result FROM cquery.calcavg;
result
31
31.25

The query will continue running in the background until it’s manually stopped or the server shuts down.

STOP CONTINUOUS calcavg;

Developer and support

The experimental TimeTrails branch was developed under the lead of Pedro Ferreira, a software developer at MonetDB Solutions.