The Aleri Streaming Platform has virtually unlimited uses. In the links below, we have provided descriptions of some of the more common designs people are implementing in the Aleri Streaming Platform. Included in each “how-to” is an example data model. In some cases, we have also provided example data sets to import into the platform. For further explanation or to learn about the many other uses, please contact us.
Click on the desired Design Pattern to expand the answer.
Suppose you have a stream of data that receives many updates, such as a stock market feed that keeps the last price tick for each symbol.
Furthermore, suppose that some of the downstream calculations are computationally expensive, and don't need to be recalculated on every update. How can you collect and "pulse" the updates so that the recalculations are done periodically instead of continuously?
Three features in the Aleri Streaming Platform---FlexStreams, the dictionary data structure, and timers---allow you to solve this "internal pulsing" problem.
The goal is to build a FlexStream from an input stream "InStream". The following defines two local variables in the FlexStream:
int32 version := 0;
dictionary(typeof(InStream), int32) versionMap;These two variables keep a current version, and a version number for each record. The SPLASH code handling events from the input stream is
{ versionMap[InStream] := version; }The special Timer block within the FlexStream sends the inserts and
updates:
{
for (k in versionMap) {
if (version = versionMap[k]) output setOpcode(k, upsert);
}
version := version + 1;
}You can configure the interval between runs of the Timer block number of seconds so that only those events with the current version get sent downstream, and the version number gets incremented for the next set of updates.
You can download the complete data model to experiment with it.
[Note: the Aleri Streaming Platform has a related feature, called "pulsed subscription", that allows an external client to coalesce changes every n seconds. But that feature is specifically for data sent to the outside world, rather than sent internally to other streams.]
You generally want to only keep a small portion of the data in Complex Event Processing (CEP) applications. In the Aleri Streaming Platform, windows are used to automatically purge data. But occasionally, you might need to keep track of statistics for all of the events, not just those in the window. How do you solve this "maintaining statistics" problem?
The Aleri Streaming Platform has a dictionary data structure that can be used to solve this common problem.
A concrete example of this problem would be keeping track of the minimum and maximum price of stock trades. The data model (with associated test data), has a source stream "Trades" with fields
int32 Id
string Symbol
double Price
int32 Shares string Symbol
double Price
int32 Shares
double maxPrice
double minPriceThe FlexStream keeps two local dictionaries with declarations
dictionary(string, double) maxv;
dictionary(string, double) minv; {
double curmax := maxv[Trades.Symbol];
double curmin := minv[Trades.Symbol];
if (getOpcode(Trades) = delete) {
exit;
}
if (isnull(curmax) or (curmax < Trades.Price)) {
maxv[Trades.Symbol] := Trades.Price;
}
if (isnull(curmin) or (curmin > Trades.Price)) {
minv[Trades.Symbol] := Trades.Price;
}
output setOpcode([Symbol = Trades.Symbol | Price = Trades.Price; Shares = Trades.Shares;
maxPrice = maxv[Trades.Symbol]; minPrice = minv[Trades.Symbol] ], upsert);
}This method can be extended to other statistics of interest, such as keeping the average price over all events.
Here's a common idiom for maintaining a collection of ordered records that makes use of the data structures in SPLASH as well as variables and loops.
One example inspired by stock trading maintains the top of an order book. Suppose there is a stream called "Bid" of bids of stocks (the example is kept simple by not considering the "offer" side). Suppose the stream has the fields
int32 Id
string Symbol
double Price
int32 Shares insert: Id=1, Symbol='IBM', Price=43.11, Shares=1000
insert: Id=2, Symbol='ALR', Price=22.08, Shares=200
update: Id=1, Symbol='IBM', Price=43.17, Shares=900int32 Position
string Symbol
double Price
int32 Shares insert: Id=1, Symbol='IBM', Price=43.11, Shares=1000
insert: Id=2, Symbol='IBM', Price=43.17, Shares=900
insert: Id=3, Symbol='IBM', Price=42.66, Shares=800
insert: Id=4, Symbol='IBM', Price=45.81, Shares=50 insert: Id=5, Symbol='IBM', Price=46.41, Shares=75 insert: Position=1, Symbol='IBM', Price=46.41, Shares=75
insert: Position=2, Symbol='IBM', Price=45.81, Shares=50
insert: Position=3, Symbol='IBM', Price=43.17, Shares=900To solve the problem, you first need a way to remember previous bids. In the Aleri Streaming Platform, there's a data structure called an "event cache" for storing previous events. An event cache holds a number of events grouped into buckets. For the order book problem, you can use the event cache declaration
eventCache(Bid[Symbol], coalesce, Price desc) previous;Event caches also allow other options, but these are the only ones needed for the order book example.
For the second step, you need to process events from the Bid stream. That's possible using the following bit of SPLASH code, which is run automatically for every event:
{
int32 i := 0;
string s := Bid.Symbol;
while ((i < count(previous.Id)) and (i < 3 ) ) {
output setOpcode([ Position=i+1; Symbol=s; Price=nth(i,previous.Price); Shares=nth(i,previous.Shares) ], upsert);
i := i + 1;
}
while (i < 3) {
output setOpcode([ Position=i+1; Symbol=s], safedelete);
i := i + 1;
}
}The Aleri Streaming Platform allows you to manipulate structured XML data like most databases. A value of "xml" type is a tree structure like
<Address>
<Name value="George Bush"/>
<Street value="100 Dusty Road"/>
<City value="Crawford"/>
<State value="Texas"/>
</Address>The operations are a subset of the SQL/XML standard. You create variables holding xml values and assign them to values such as
xml x1 := xmlparse('<element/>');and convert them back and forth to strings.
Here's an example model and small data set that demonstrates one use of the xml data type: creating joins that propagate changes in an atomic way. It takes in events with fields
string department
string managerand events with fields
string department
string memberoutputs structures like
<department name="Software Principles">
<member name="Elsa Gunter"/>
<member name="Amy Felty"/>
<member name="John Reppy"/>
</department>If you look at the model in the Aleri Studio, you'll see a diagram

with two source streams "departments" and "members", a Join Stream "fullDept" with fields
string department
string member
string manageran Aggregate Stream "groupDept" with fields
string department
string xmlStructureand a similar Aggregate Stream "groupMgr" stream. These aggregations make the changes from the Join Stream atomic. The above structure will be altered whenever a new department is added or a member of the department changes. Those changes are harder to see from the Join Stream alone. For example, if a member of a department changes, the Join Stream will only output the changed row instead of the full department.
It's instructive to look at the computation inside "groupDept". The expression for computing the above structure is
xmlserialize(xmlelement(department, xmlattributes(fullDept.department as "name"), xmlagg(xmlelement(member, xmlattributes(fullDept.member as "name")))))The important operation is the "xmlagg" function, which aggregates a number of xml values into one xml value. More precisely, each "member" element from the "fullDept" stream is copied into the structure, which is wrapped in a "department" tag at the top level.
Future releases will have more functionality added to the xml data type, such as searching and replacing elements using XPath expressions.
We begin our exploration of CEP design patterns with basic filtering. While filtering is easy to implement with a number of non-CEP products, or with custom-built applications, this example will help us set up a framework for describing other, more complex patterns.

(click image to view larger)
The above diagram depicts a simple filter query. The query subscribes to one stream, evaluates a specified logical condition based on event attributes, and, if the condition is true, publishes the event to the destination stream. For example, an application monitoring a stream of purchase orders may filter out all orders where the condition is Priority != ‘High’ and Amount < 100000.
This example presents the simplest kind of filter, where events are evaluated one by one, and where the query condition only involves the attributes of one event. It is also possible to construct many other more complex filters, for example, filters that compare events to other events in the same stream, or in another stream, or compare events to a computed metric. For instance, a filter might capture orders where the purchase amount is larger than the previous purchase amount, or purchase amounts that are larger than the average for the previous day. Such relatively more complex queries are discussed later in this document.
Filtering is ubiquitous in CEP applications. Here are some examples:
Filter queries in CCL look much like SQL queries. Unlike SQL queries, however, CCL queries execute continuously. The query shown below subscribes to StreamAllPOs, continuously evaluates the condition for all events, and sends those events that match the condition to StreamImportantPOs.
INSERT INTO StreamImportantPOs SELECT * FROM StreamAllPOs WHERE PRIORITY = 'High’ OR Amount >= 100000;
This is the first of the design patterns considered in this document, where multiple events are kept in memory. In-memory data caching is the foundation of most CEP design patterns. The cache typically stores two kinds of data:
Note that, although we are describing an in-memory cache here, many applications require this cache to be persistent. This means that, if a machine that hosts the CEP engine fails, the data kept in windows is not lost. This functionality is even more important when the window holds not just the last few seconds’, but minutes’, hours’, days’, and even weeks’ worth of events.
In-memory caching is used in every non-trivial CEP application. Here are just a few examples:
In this example, we will focus on the first form of caching, namely, caching events from a data stream. The Database Lookup design pattern, presented later in this document, discusses the caching of database data.
Windows
-- A window to keep data for ten minutes, populated by stream StreamClicks
CREATE WINDOW RecentHistory(IPAddress LONG, URLvisited STRING, FileSize LONG)
KEEP 10 MINUTES;
INSERT INTO RecentHistory
SELECT *
FROM StreamClicks;
-- A window to keep the last value for each IPAddress:
CREATE WINDOW LastValues(IPAddress LONG, URLvisited STRING, FileSize LONG)
KEEP LAST PER IPAddress;
-- A window that keeps the 1000 largest downloads. Each download is kept for one hour, or until a larger download displaces it from the window:
CREATE WINDOW LargestDownloads(IPAddress LONG, URLvisited STRING, FileSize LONG)
KEEP 1000 LARGEST BY FileSize KEEP 1 HOUR;-- Use the RecentHistory window defined above to check how many other records with
-- the same IP address have been seen recently
INSERT INTO StreamNewIP
SELECT S.*, Count(H)
FROM StreamClicks AS S,
RecentHistory as H
WHERE S.IP = H.IPUnlike the previous design pattern, this pattern does not merely keep events in memory, but uses the stored values to compute various statistics. A typical example here would involve computing a running average over a sliding window. (As we have seen in the previous example, a window is an object that holds a set of events in memory):

(Click Image to Enlarge)
This design pattern comes in quite a few flavors, differing along the following dimensions:
Windows-based computations are used in a wide variety of applications. For example:
-- Compute one-minute “bars” (Avg, Max, Min, Closing) and VWAP (Volume-Weighted
-- Average Price). Output results continuously, as soon as results change.
INSERT INTO OneMinuteBarView
SELECT Symbol, AVG(Price), MAX(Price), MIN(Price), SUM(Price*Volume)/SUM(Volume)
FROM StreamFeed KEEP 1 MINUTE;
-- Determine the total bandwidth consumed by each IP address for the
-- last 1000 downloads. Output results every 15 second.
INSERT INTO StreamConsumedBandwidth
SELECT IP, SUM(FileSize)
FROM StreamDownloadLog KEEP 1000 ROWS
GROUP BY IP
OUTPUT EVERY 15 SECONDS;While there are applications that deal exclusively with real-time events, most useful applications refer to historical data or reference data to enrich the incoming events. The following diagram shows how:
In SQL terminology, this design pattern implements a join between an incoming data stream and a database table.
The more advanced implementations of this design pattern often require:
This design pattern is widely applicable. For example:
In these examples, the SQL lookup code (delimited by [[ ... ]]) is executed in the database, not in the Coral8 Engine.
-- Look up the SKU and the name of the product for each RFID event, via the Tag ID.
INSERT INTO
StreamRFIDEventsEnriched
SELECT
StreamRFIDEvents.TagID, DbResult.SKU, DbResult.ProductName
FROM StreamRFIDEvents,
(DATABASE "OracleDb"
SCHEMA "schemas/product_info.ccs"
-- SQL Code to perform lookup:
[[SELECT sku, product_name
FROM product_info pi
WHERE pi.tag_id = StreamRFIDEvents.TagID]]) AS DbResult;
-- Look up yesterday’s closing prices for each incoming trade.
INSERT INTO
StreamTradesEnriched
SELECT
InTrades.Symbol, InTrades.Price, DbResult.ClosingPrice
FROM StreamTrades,
(DATABASE "OracleDb"
SCHEMA "schemas/closing-price.ccs"
-- SQL Code to perform lookup:
[[SELECT closing_price
FROM price_history ph
WHERE ph.symbol = ?InTrades.Symbol
AND ph.closing_date > current_date-2
AND ph.closing_date <= current_date-1]]
) as DbResult
While the CEP Engine can store large volumes of events in windows, it is often necessary to write raw or processed (filtered, aggregated, correlated, and so on) events into a traditional relational database. A relational database can manage very large volumes of data for very long periods of time, and it also supports a number of interfaces that other applications can use to retrieve the data. This design pattern illustrates the complementary nature of databases and CEP engines.

Click Image to View Larger)
Note that, if the database must store large volumes of events, this design pattern may call for a number of advanced techniques, such as batching, asynchronous writing (to avoid blocking), queuing (to handle spikes), concurrent writes, writing via native database interfaces, and so on.
This design pattern cuts across a wide range of applications, such as:
-- Store aggregated information, calculated every minute, based on a 10-minute
-- sliding window, into an Oracle database table called one_minute_summary.
EXECUTE SATEMENT
DATABASE “MyOracleDb”
[[
-- SQL code to perform an INSERT. Can be SQL or PL/SQL or T-SQL
INSERT INTO one_minute_summary
VALUES(?symbool, ?closing_price, ?avg_price, ?max_price, ?min_price, ?vwap);
]]
SELECT Symbol AS symbol,
Price AS closing_price,
AVG(Price) AS avg_price,
MAX(Price) AS max_price,
MIN(Price) AS min_price,
SUM(Price*Volume)/SUM(Volume) AS vwap
FROM StreamFeed KEEP 10 MINUTE
OUTPUT EVERY 1 MINUTE;While simple applications often look at just one stream at a time, most advanced applications must look at and correlate events across multiple streams. A join in a CEP application shares many characteristics with a join in SQL.

(Click Image to View Larger)
In CEP, a join between two data streams necessarily involves one or more windows. Streams do not store events, but pass events to, from, and between queries. To perform a join, it is necessary to store some events in memory, to wait for events on the corresponding stream. This is what a window does. The above diagram depicts a stream to window join. Events arriving in Stream 2 are stored in the window. Events arriving in Stream 1 are joined with events stored in the window, and the matching pairs are published by the join. See the CCL Examples section for specific examples.
Other kinds of interesting CEP joins include:
Even at low data rates, joins are very CPU-intensive. Thus, most CPE joins require heavy indexing. The indices used in CEP applications are similar to the ones used in relational databases, but there is at least one important difference: CEP indices must be highly dynamic. For example, if a window that stores the last 10 minutes’ worth of data is indexed, the index must be updated, both when new events enter the window, and when old events expire from the window. At 10,000 events per second, that’s 20,000 index updates per second! A good CEP engine creates proper indices for windows without requiring the user to specify them.
Most sophisticated CEP applications employ joins. Here are some examples:
-- Stream to Window Join: keep a 10 second window on stream StreamFeed2,
-- and join all the events from StreamFeed1 with this window.
INSERT INTO StreamMatches
SELECT StreamFeed1.*, StreamFeeed2.*
FROM StreamFeed1,
StreamFeed2 KEEP 10 SECONDS
WHERE StreamFeed1.Symbol = StreamFeed2.Symbol;
-- Same as above, but perform a left outer join: produces outputs
-- (containing some NULLS) even if no matches are found.
INSERT INTO StreamMatches
SELECT StreamFeed1.*, StreamFeeed2.*
FROM
StreamFeed1
LEFT OUTER JOIN
StreamFeed2 KEEP 10 SECONDS
ON
StreamFeed1.Symbol = StreamFeed2.Symbol;
-- Window to Window join: keep windows on both streams. This example correlates alerts from a Virus Checker System and an Intrusion Detection System.
INSERT INTO StreamAlertCommonIP
SELECT StreamIDSAlerts.IP
FROM
StreamIDSAlerts KEEP 10 MINUTES,
StreamVirusCheckAlerts KEEP 10 MINUTES,
WHERE
StreamIDSAlerts.IP = StreamVirusCheckAlerts.IP;