TUMBLING_TIME()
A tumbling time batch window holds and processes events that arrive during the time period as a batch.
Syntax
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>)
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>, start <INT|LONG>)
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>, current.event <BOOL>)
WINDOW TUMBLING_TIME(time <INT|LONG|TIME>, start <INT|LONG>, current.event <BOOL>)
Query Parameters
| Name | Description | Default Value | Possible Data Types | Optional | Dynamic | 
|---|---|---|---|---|---|
| time | The batch time period in which the window process the events. | INT LONG TIME | No | No | |
| start | This specifies an offset in milliseconds in order to start the window at a time different to the standard time. | Timestamp of first event | INT LONG | Yes | No | 
| current.event | Let the window stream the current events out as and when they arrive to the window while expiring them in batches. | false | BOOL | Yes | No | 
Example 1
CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE SINK STREAM OutputStream (symbol string, price double);
@info(name = 'query1')
INSERT INTO OutputStream
SELECT symbol, sum(price) AS price
FROM InputEventStream WINDOW TUMBLING_TIME(20 sec);
This window collects and processes incoming events as a batch every 20 seconds and then outputs them to a stream.
Example 2
CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE SINK STREAM OutputStream (symbol string, sumPrice double);
@info(name = 'query1')
INSERT INTO OutputStream
SELECT symbol, sum(price) AS sumPrice
FROM InputEventStream WINDOW TUMBLING_TIME(20 sec, true);
This window sends the arriving events directly to the output letting the sumPrice to increase gradually and on every 20 second interval it clears the window as a batch resetting the sumPrice to zero.
Example 3
CREATE STREAM InputEventStream (symbol string, price float, volume int);
CREATE WINDOW StockEventWindow (symbol string, price float, volume int) TUMBLING_TIME(20 sec) output all events;
CREATE SINK STREAM OutputStream (symbol string, price double);
@info(name = 'query0')
INSERT INTO StockEventWindow
FROM InputEventStream;
@info(name = 'query1')
INSERT all events INTO OutputStream 
SELECT symbol, sum(price) AS price
FROM StockEventWindow;
This uses a defined window to process events arrived every 20 seconds as a batch and output all events.
Example 4
This example shows aggregating events over time in a batch (tumbling) manner.
Stream Worker Code
CREATE STREAM TemperatureStream(sensorId string, temperature double);
CREATE SINK STREAM OverallTemperatureStream(avgTemperature double, maxTemperature double, numberOfEvents long);
CREATE SINK STREAM SensorIdTemperatureStream(sensorId string, avgTemperature double, maxTemperature double);
@info(name = 'Overall-analysis')
-- Calculate average, maximum, and count for `temperature` attribute.
INSERT INTO OverallTemperatureStream
SELECT avg(temperature) AS avgTemperature,
       max(temperature) AS maxTemperature,
       count() AS numberOfEvents
-- Aggregate events every `1 minute`, from the arrival of the first event.
FROM TemperatureStream WINDOW TUMBLING_TIME(1 min);
@info(name = 'SensorId-analysis')
INSERT INTO SensorIdTemperatureStream
SELECT sensorId,
-- Calculate average, and maximum for `temperature`, by grouping events by `sensorId`.
       avg(temperature) AS avgTemperature,
       max(temperature) AS maxTemperature
-- Aggregate events every `30 seconds` from epoch timestamp `0`.
FROM TemperatureStream WINDOW TUMBLING_TIME(30 sec, 0)
GROUP BY sensorId
-- Output events only when `avgTemperature` is greater than `20.0`.
WHERE avgTemperature > 20.0;
Batch Time Aggregation Behavior
When events are sent to TemperatureStream, the following events are emitted at OverallTemperatureStream via the Overall-analysis query, and SensorIdTemperatureStream via the SensorId-analysis query.
| Time | Input to TemperatureStream | Output at OverallTemperatureStream | Output at SensorIdTemperatureStream | 
|---|---|---|---|
| 9:00:10 | [ '1001',21.0] | - | - | 
| 9:00:20 | [ '1002',25.0] | - | - | 
| 9:00:30 | - | - | [ '1001',21.0,21.0],['1002',25.0,25.0] | 
| 9:00:35 | [ '1002',26.0] | - | - | 
| 9:00:40 | [ '1002',27.0] | - | - | 
| 9:00:55 | [ '1001',19.0] | - | - | 
| 9:00:00 | - | - | [ '1002',26.5,26.0] | 
| 9:01:10 | - | [ 23.6,27.0,5] | - | 
| 9:01:20 | [ '1001',21.0] | - | - | 
| 9:01:30 | - | - | [ '1001',21.0,21.0] | 
| 9:02:10 | - | [ 21.0,21.0,1] | - |