Skip to main content

Streaming

The DataCell stream processing facilities of MonetDB are best illustrated using a minimalist example, where a sensor sends events to the database, which are picked up by a continuous query and sent out over a stream towards an actuator. To run the example, you should have a MonetDB binary with DataCell functionality enabled. This will ensure that the required libraries are loaded and the SQL catalog is informed about the stream specific functions/operators. It will also create the DataCell schema, which is used to collect compiled continuous queries. The final step in the startup is to enable the DataCell optimizer pipeline.

sql> set optimizer = 'datacell_pipe';
sql> create table datacell.bsktin (id integer, tag timestamp, payload integer);
sql> create table datacell.bsktout (like datacell.bsktin);

sql> call datacell.receptor('datacell.bsktin', 'localhost', 50500);
sql> call datacell.emitter('datacell.bsktout', 'localhost', 50600);
sql> call datacell.query('datacell.pass', 'insert into datacell.bsktout select * from datacell.bsktin;');
sql> call datacell.resume();

After these simple steps, it suffices  to hook up a sensor to sent events to the DataCell and to hook up an actuator to listen for response events. The result of this experiment will be a large number of randomly generated events passing through the stream engine in a bulk-fashion. 

$ nc -l -u localhost 50600 &
$ sensor --host=localhost --port=50500 --events=1000 --columns=3 &

The Linux netcat (nc) can be used as a strawmen's actuator to monitor the output of the DataCell. The distribution comes with a sensor and actuatoror simulator. The DataCell source code contains  a fire detection scenario to exercise the DataCell and forms as a basis for cloning your own application.

The example reconsidered
The DataCell operates on relational tables. The first action is to identify all such tables and redefine them as baskets by attaching them to receptors, emittors, or intermittent baskets.

sql> call datacell.receptor('datacell.bsktin', 'localhost', 50500);
sql> call datacell.emitter('datacell.bsktout'
, 'localhost', 50600);

A receptor thread is attached to the 'bsktin' basket on a TCP stream on port 50500 by default over which we receive tuples in CSV format. The number of fields and their lexical convention should comply with the corresponding table definition. The same semantics apply to the format as you would normally use when a COPY INTO command over a CSV file is given. The receptor mode is either active or passive. In passive mode, the default setting, it is the sensor that takes the initiative in contacting the streaming engine to deposits events. In the active mode, it is the streaming engine that contacts the sensor for more events. Note, the receptor becomes active only after you issue the datacall.resume('bsktin') or datacell.resume() operation. The calls shown are actually a shorthand for the more verbose version, where protocol and mode are made explicit.

sql> call datacell.receptor('datacell.bsktin', 'localhost', 50500,'tcp','active');
sql> call datacell.emitter('datacell.bsktout'
, 'localhost', 50600,'udp','passive');

The sensor simulator is geared at testing the infrastructure and takes short cuts on the event formats sent. Currently, it primarily generates event records starting with an optional event identifier, followed by an optional timestamp, and a payload of random integer values. To generate a test file with 100 events for the example, you can rely mostly on the default settings. Hooking up the sensor to the stream engine merely requires an additional hostname and port instead of a file argument.  A glimpse of the sensor interaction can be obtained using --trace, which writes the events to standard output, or a specific file. The sensor simulator asks for user input before it exits. This way, the receiving side can pick up the events and not be confronted with a possible broken UDP channel.

$ sensor --events=100 --protocol=debug --columns=3
1,306478,1804289383
... 98 more ...
100,137483,1956297539
$ sensor --host=localhost --port=50500 --events=100 --columns=3

An alternative scheme is to replay an event log using the --file and the --replay option. It reads an event possibly with a fixed delay (--delay=<milliseconds>), and sents it over the receptor.  An exact (time) replay calls for identifying the column with the temporal information, i.e. using option --time=<field index>.

After this step, the events have been picked up by the receptor and added to the basket datacell.bsktin. This table can be queried like any other table, but be aware that it may be emptied concurrently. The next step is define a continuous query, which in this case passes the input receveid to the output channel. Reception and emitting can be temporarily interrupted using the datacell.pause(objectname) operation.

After registration of the query, the datacell module contains the necessary optimized code for the continuous query processing. The scheduler is subsequently restarted using datacell.resume(), which moves the data from the bsktin into bsktout  when it arrives. You can check the result using ordinary SQL queries over the table producing functions:datacell.receptors(), datacell.emitters(), datacell.baskets() and datacell.queries();

sql> call datacell.query('datacell.pass', 'insert into datacell.bsktout select * from datacell.bsktin;');