Streaming

Streaming mk Thu, 06/09/2011 - 00:53

The DataCell stream processing facilities of MonetDB are best illustrated using a minimalist example, where a sensor sends events to the database, which are picked up by a continuous query and sent out over a stream towards an actuator. To run the example, you should have a MonetDB binary with DataCell functionality enabled. This will ensure that the required libraries are loaded and the SQL catalog is informed about the stream specific functions/operators. It will also create the DataCell schema, which is used to collect compiled continuous queries. The final step in the startup is to enable the DataCell optimizer pipeline.

sql> set optimizer = 'datacell_pipe';
sql> create table datacell.bsktin (id integer, tag timestamp, payload integer);
sql> create table datacell.bsktout (like datacell.bsktin);

sql> call datacell.receptor('datacell.bsktin', 'localhost', 50500);
sql> call datacell.emitter('datacell.bsktout', 'localhost', 50600);
sql> call datacell.query('datacell.pass', 'insert into datacell.bsktout select * from datacell.bsktin;');

sql> call datacell.resume();

After these simple steps, it suffices  to hook up a sensor to sent events to the DataCell and to hook up an actuator to listen for response events. The result of this experiment will be a large number of randomly generated events passing through the stream engine in a bulk-fashion. 

$ nc -l -u localhost 50600 &
$ sensor --host=localhost --port=50500 --events=1000 --columns=3 &

The Linux netcat (nc) can be used as a strawmen's actuator to monitor the output of the DataCell. The distribution comes with a sensor and actuatoror simulator. The DataCell source code contains  a fire detection scenario to exercise the DataCell and forms as a basis for cloning your own application.

The example reconsidered
The DataCell operates on relational tables. The first action is to identify all such tables and redefine them as baskets by attaching them to receptors, emittors, or intermittent baskets.

sql> call datacell.receptor('datacell.bsktin', 'localhost', 50500);
sql> call datacell.emitter('datacell.bsktout'
, 'localhost', 50600);

A receptor thread is attached to the 'bsktin' basket on a TCP stream on port 50500 by default over which we receive tuples in CSV format. The number of fields and their lexical convention should comply with the corresponding table definition. The same semantics apply to the format as you would normally use when a COPY INTO command over a CSV file is given. The receptor mode is either active or passive. In passive mode, the default setting, it is the sensor that takes the initiative in contacting the streaming engine to deposits events. In the active mode, it is the streaming engine that contacts the sensor for more events. Note, the receptor becomes active only after you issue the datacall.resume('bsktin') or datacell.resume() operation. The calls shown are actually a shorthand for the more verbose version, where protocol and mode are made explicit.

sql> call datacell.receptor('datacell.bsktin', 'localhost', 50500,'tcp','active');
sql> call datacell.emitter('datacell.bsktout'
, 'localhost', 50600,'udp','passive');

The sensor simulator is geared at testing the infrastructure and takes short cuts on the event formats sent. Currently, it primarily generates event records starting with an optional event identifier, followed by an optional timestamp, and a payload of random integer values. To generate a test file with 100 events for the example, you can rely mostly on the default settings. Hooking up the sensor to the stream engine merely requires an additional hostname and port instead of a file argument.  A glimpse of the sensor interaction can be obtained using --trace, which writes the events to standard output, or a specific file. The sensor simulator asks for user input before it exits. This way, the receiving side can pick up the events and not be confronted with a possible broken UDP channel.

$ sensor --events=100 --protocol=debug --columns=3
1,306478,1804289383
... 98 more ...

100,137483,1956297539
$ sensor --host=localhost --port=50500 --events=100 --columns=3

An alternative scheme is to replay an event log using the --file and the --replay option. It reads an event possibly with a fixed delay (--delay=<milliseconds>), and sents it over the receptor.  An exact (time) replay calls for identifying the column with the temporal information, i.e. using option --time=<field index>.

After this step, the events have been picked up by the receptor and added to the basket datacell.bsktin. This table can be queried like any other table, but be aware that it may be emptied concurrently. The next step is define a continuous query, which in this case passes the input receveid to the output channel. Reception and emitting can be temporarily interrupted using the datacell.pause(objectname) operation.

After registration of the query, the datacell module contains the necessary optimized code for the continuous query processing. The scheduler is subsequently restarted using datacell.resume(), which moves the data from the bsktin into bsktout  when it arrives. You can check the result using ordinary SQL queries over the table producing functions:datacell.receptors(), datacell.emitters(), datacell.baskets() and datacell.queries();

sql> call datacell.query('datacell.pass', 'insert into datacell.bsktout select * from datacell.bsktin;');

 

Architecture Overview

Architecture Overview mk Wed, 08/15/2012 - 21:51

The DataCell Architecture
The DataCell approach is easily understood using previously mentioned example. The sensor program is a simulator of a real-world sensor which emits events at a regular interval, e.g. a temperature, humidity, noise, etc. The actuator is a device simulator that is controlled using events received, e.g. a fire alarm. The sensors and actuators work independently.  They are typically proprietary devices that communicate with a controlling station using a wired network. The only requirement in the DataCell is that devices can communicate using the UDP protocol to deliver events by default with the most efficient event message format CSV. Alternative message format handlers can readily be included by extending the formats recognized by the adaptors or as a simple filter between the device and the DataCell.

Baskets
The basket is the key data structure of the streaming engine. Its role is to hold a portion of an event stream, also denoted as an event window. It is represented as a temporary main-memory table. Unlike other stream systems there is no a priori order or fixed window size. The basket is simply a (multi-) set of event records received from an adapter or events ready to be shipped to an actuator.  There is no persistency and no transaction management over the baskets. If a basket should survive session brackets, its content should be inserted into a normal table. The baskets can be queried with SQL like any other table, but concurrent actions may leave you with a mostly empty table to look at.

Adapters
The receptor and emitter adapters are the interface units in the DataCell to interact with sensors and actuators. Both communicate with their environment through a channel. The default channel is a UDP connection for speed. By default the receptor is a passive thread, opening a channel and awaiting events to arrive. Contrary, the emitter is an active thread, which immediately throws the events on the channel identified. Hooks have been created to change the roles, e.g. the receptor polling a device and emitter to wait for polling actuators.

Events that can not be parsed are added to the corresponding basket as an error. All errors collected can be inspected using the table producing function datacell.errors().

Continuous queries
The continuous queries are expressed as ordinary SQL queries, where previously declared basket tables are recognized by the DataCell optimizer. For convenience they can be packed in a procedure, where the events from a basket can be delivered to multiple baskets. Access to these tables is replaced and interaction with the adapters is regulated with a locking scheme. Mixing basket tables and persistent tables is allowed. An SQL procedure can be used to encapsulate multiple SQL statements and deliver the derived events to multiple destinations.

Continuous queries often rely on control over the minimum/maximum number of events to consider when the query is executed. This information is expressed as an ordinary predicate in the where clause.  The following pre-defined predicates are supported. They inform the DataCell scheduler when the next action should be taken. They don't affect the current query, which allows for a dynamic behavior. The window slide size can be calculated with a query.  It also means that a startup query is needed to inform the scheduler the first time or set the properties explicitly using datacell.basket() and datacell.beat() calls.

datacell.threshold(B,N) query is only executed when the basket B has at least size N
datacell.window(B,M,S) extract a window of at most size M and slide with size S afterwards
datacell.window(B,T,Ts) extract a window based on a temporal interval of size T followed by a stride Ts
datacell.beat(B,T) next query is executed after a T milliseconds delay (excluding query execution time)

The sliding windows constraints are mutually exclusive. Either one slide based on the number of events is consumed or the time window. For time slicing, the first timestamp column in the basket is used as frame of reference. This leaves all other temporal columns as ordinary attributes.

Stream catalog

Stream catalog mk Sun, 01/29/2012 - 16:41

The status of the DataCell is mapped onto a series of table producing SQL functions. The status of the DataCell can be queried using the table producing functions datacell.baskets(), datacell.receptors(), datacell.emitters() and datacell.queries()

sql>select * from datacell.receptors();
+-----------------+-----------+-------+----------+---------+---------+----------------------------+--------+----------+---------+
| nme             | host      | port  | protocol | mode    | status  | lastseen                   | cycles | received | pending |
+=================+===========+=======+==========+=========+=========+============================+========+==========+=========+
| datacell.bsktin | localhost | 50500 | TCP      | passive | running | 2012-08-15 19:31:28.000000 |      2 |       20 |       0 |
+-----------------+-----------+-------+----------+---------+---------+----------------------------+--------+----------+---------+
1 tuple (1.800ms)
sql>select * from datacell.emitters();
+------------------+-----------+-------+----------+--------+---------+----------------------------+--------+------+---------+
| nme              | host      | port  | protocol | mode   | status  | lastsent                   | cycles | sent | pending |
+==================+===========+=======+==========+========+=========+============================+========+======+=========+
| datacell.bsktout | localhost | 50600 | UDP      | active | running | 2012-08-15 19:31:28.000000 |      2 |   10 |       0 |
+------------------+-----------+-------+----------+--------+---------+----------------------------+--------+------+---------+
1 tuple (1.725ms)

The receptors and emitters are qualified by their communication protocal and modes. The last time they have received/sent events is shown. The events not yet handled by a continuous query are denoted as pending.


sql>select * from datacell.baskets();
+---------------------+-----------+---------+-----------+-----------+------------+------+----------------------------+--------+
| nme                 | threshold | winsize | winstride | timeslice | timestride | beat | seen                       | events |
+=====================+===========+=========+===========+===========+============+======+============================+========+
| datacell.bsktmiddle |         0 |       0 |         0 |         0 |          0 |    0 | 2012-08-15 19:31:28.000000 |      0 |
| datacell.bsktin     |         0 |       0 |         0 |         0 |          0 |    0 | 2012-08-15 19:31:28.000000 |      0 |
| datacell.bsktout    |         0 |       0 |         0 |         0 |          0 |    0 | 2012-08-15 19:31:28.000000 |      0 |
+---------------------+-----------+---------+-----------+-----------+------------+------+----------------------------+--------+
3 tuples (1.639ms)
sql>select * from datacell.queries();
+-----------------+---------+----------------------------+--------+--------+------+-------+---------------------------------------------------------------------------------+
| nme             | status  | lastrun                    | cycles | events | time | error | def                                                                             |
+=================+=========+============================+========+========+======+=======+=================================================================================+
| datacell.pass   | running | 2012-08-15 19:31:28.000000 |      6 |     20 |  613 |       | insert into datacell.bsktmiddle select * from datacell.bsktin;                  |
| datacell.filter | running | 2012-08-15 19:31:28.000000 |      4 |      7 |  653 |       | insert into datacell.bsktout select * from datacell.bsktmiddle where id %2 = 0; |
+-----------------+---------+------
---------------------+--------+--------+------+-------+----------------------------------------------------------------------------------+

The baskets have properties used by the scheduler for emptying them. The events pending are shown. The continuous queries are marked with how often they have been selected for execution, the total number of events take from all input baskets, the total execution time and their definition.

Sensor simulator

Sensor simulator mk Sat, 01/28/2012 - 17:47

The sensor simulator is geared at testing the total infrastructure and takes short cuts on the event formats sent. Currently, it primarily generates event records starting with an optional event identifier, followed by an optional timestamp, and a payload of random integer values.

sensor [options]
--host=<host name>, default=localhost
--port=<portnr>, default=50500
--sensor=<name>
--protocol=<name> udp or tcp(default)
--increment=<number>, default=1
--timestamp, default=on
--columns=<number>, default=1
--events=<events length>, (-1=forever,>0), default=1
--file=<data file>
--replay use file or standard input
--time=<column> where to find the exact time

--batch=<batchsize> , default=1
--delay=<ticks> interbatch delay in ms, default=1
--trace=<trace> interaction
--server run as a server
--client run as a client


To generate a test file with 100 events for the example, you can rely mostly on the default settings. Hooking up the sensor to the stream engine merely requires an additional hostname and port instead of a file argument.  A glimpse of the sensor interaction can be obtained using the 'debug' protocol, which writes the events to standard output, or a specific file. The status of the DataCell can be checked with datacell.dump(), which now shows a hundred events gathered in the basket bsktin. The sensor simulator asks for user input before it exits. This way, the receiving side can pick up the events and not be confronted with a broken UDP channel.

$ sensor --events=100 --protocol=debug --columns=3
1,306478,1804289383
... 98 more ...
100,137483,1956297539
$ sensor --host=localhost --port=50500 --events=100 --columns=3

Actuator simulator

Actuator simulator mk Sat, 01/28/2012 - 17:40

To test the DataCell, the distribution contains a simple event simulator. It generates a stream of MonetDB tuple values containing only random integers. Each tuple starts with a time stamp it has been created.

The actuator simulator provides the following options:

actuator [options]
--host=<host name>, default localhost
--port=<portnr>, default 50600
--protocol=<name>  either tcp/udp, default tcp
--actuator=<actuator name> to identify the event received
--server run as a server (default)
--client run as a client
--events=<number>  number of events to receive
--statistics=<number>  show statistics after a series of events