How To's / Common Design Patterns

The Aleri Streaming Platform has virtually unlimited uses. In the links below, we have provided descriptions of some of the more common designs people are implementing in the Aleri Streaming Platform. Included in each “how-to” is an example data model. In some cases, we have also provided example data sets to import into the platform. For further explanation or to learn about the many other uses, please contact us.

Click on the desired Design Pattern to expand the answer.

  • Internal Pulsing for Controlling Recalculation

    Suppose you have a stream of data that receives many updates, such as a stock market feed that keeps the last price tick for each symbol.
    Furthermore, suppose that some of the downstream calculations are computationally expensive, and don't need to be recalculated on every update. How can you collect and "pulse" the updates so that the recalculations are done periodically instead of continuously?

    Three features in the Aleri Streaming Platform---FlexStreams, the dictionary data structure, and timers---allow you to solve this "internal pulsing" problem.

    The goal is to build a FlexStream from an input stream "InStream". The following defines two local variables in the FlexStream:

      int32 version := 0;
      dictionary(typeof(InStream), int32) versionMap;

    These two variables keep a current version, and a version number for each record. The SPLASH code handling events from the input stream is

      { versionMap[InStream] := version; }

    The special Timer block within the FlexStream sends the inserts and
    updates:

      {
        for (k in versionMap) {
          if (version = versionMap[k]) output setOpcode(k, upsert);
        }
        version := version + 1;
      }

    You can configure the interval between runs of the Timer block number of seconds so that only those events with the current version get sent downstream, and the version number gets incremented for the next set of updates.

    You can download the complete data model to experiment with it.

    [Note: the Aleri Streaming Platform has a related feature, called "pulsed subscription", that allows an external client to coalesce changes every n seconds. But that feature is specifically for data sent to the outside world, rather than sent internally to other streams.]

  • Keeping Statistics over All Events

    You generally want to only keep a small portion of the data in Complex Event Processing (CEP) applications. In the Aleri Streaming Platform, windows are used to automatically purge data. But occasionally, you might need to keep track of statistics for all of the events, not just those in the window. How do you solve this "maintaining statistics" problem?

    The Aleri Streaming Platform has a dictionary data structure that can be used to solve this common problem.

    A concrete example of this problem would be keeping track of the minimum and maximum price of stock trades. The data model (with associated test data), has a source stream "Trades" with fields

      int32 Id
      string Symbol
      double Price
      int32 Shares

    and a time-valued window that keeps 10 seconds worth of data. The FlexStream "allMinMax" outputs events with fields
      string Symbol
      double Price
      int32 Shares
      double maxPrice
      double minPrice

    The Price and Shares fields are the last values for the particular symbol; the maxPrice and minPrice are the maximum and minimum respectively over all the events, not just those in the window.

    The FlexStream keeps two local dictionaries with declarations

      dictionary(string, double) maxv;
      dictionary(string, double) minv;

    and maintains them with the following bit of SPLASH code:
       {
         double curmax := maxv[Trades.Symbol];
         double curmin := minv[Trades.Symbol];
         if (getOpcode(Trades) = delete) {
           exit;
         }
         if (isnull(curmax) or (curmax < Trades.Price)) {
           maxv[Trades.Symbol] := Trades.Price;
         }
         if (isnull(curmin) or (curmin > Trades.Price)) {
           minv[Trades.Symbol] := Trades.Price;
         }
         output setOpcode([Symbol = Trades.Symbol | Price = Trades.Price; Shares = Trades.Shares;
         maxPrice = maxv[Trades.Symbol]; minPrice = minv[Trades.Symbol] ], upsert);
      }

    The first two lines look up the current minimum and maximum in the dictionaries for the symbol. The third line stops the method if the event is a delete, which could have been issued by the window. The next lines update the minimum and maximum if necessary. At the end, the code creates and sends the event to other streams.

    This method can be extended to other statistics of interest, such as keeping the average price over all events.

  • Maintaining the Top of the Order Book

    Here's a common idiom for maintaining a collection of ordered records that makes use of the data structures in SPLASH as well as variables and loops.

    One example inspired by stock trading maintains the top of an order book. Suppose there is a stream called "Bid" of bids of stocks (the example is kept simple by not considering the "offer" side). Suppose the stream has the fields

      int32 Id
      string Symbol
      double Price
      int32 Shares

    where id is the key field, the field that uniquely identifies a bid. Bids can be changed, so not only might the stream insert a new bid, but also update or delete a previous bid. For instance, the Bid stream might have the following events:
      insert: Id=1, Symbol='IBM', Price=43.11, Shares=1000
      insert: Id=2, Symbol='ALR', Price=22.08, Shares=200
      update: Id=1, Symbol='IBM', Price=43.17, Shares=900

    The goal is to output the top three highest bids any time a bid is inserted or changed for a particular stock. The fields in the output are
    int32 Position
      string Symbol
      double Price
      int32 Shares

    where Position ranges from 1 to 3. The key fields in the output are Position and Symbol. For example, if the events in the Bid stream have been
      insert: Id=1, Symbol='IBM', Price=43.11, Shares=1000
      insert: Id=2, Symbol='IBM', Price=43.17, Shares=900
      insert: Id=3, Symbol='IBM', Price=42.66, Shares=800
      insert: Id=4, Symbol='IBM', Price=45.81, Shares=50

    and the next event is
      insert: Id=5, Symbol='IBM', Price=46.41, Shares=75

    the stream should output
      insert: Position=1, Symbol='IBM', Price=46.41, Shares=75
      insert: Position=2, Symbol='IBM', Price=45.81, Shares=50
      insert: Position=3, Symbol='IBM', Price=43.17, Shares=900

    Note how the latest value appears at the top.

    To solve the problem, you first need a way to remember previous bids. In the Aleri Streaming Platform, there's a data structure called an "event cache" for storing previous events. An event cache holds a number of events grouped into buckets. For the order book problem, you can use the event cache declaration

      eventCache(Bid[Symbol], coalesce, Price desc) previous;

    The above fragment declares a variable called "previous" to hold the last events from the Bid stream. This event cache declaration specifies

    • The stream of events that the event cache remembers (namely Bid);
    • The field or fields on which events will be grouped (namely Symbol);
    • The option "coalesce", meaning that inserts and updates should be coalesced into single records; and
    • A means of ordering the events in the group, here ordered by descending order of the Price field.

    Event caches also allow other options, but these are the only ones needed for the order book example.

    For the second step, you need to process events from the Bid stream. That's possible using the following bit of SPLASH code, which is run automatically for every event:

      {
        int32 i := 0;
        string s := Bid.Symbol;
        while ((i < count(previous.Id)) and (i < 3 ) ) {
          output setOpcode([ Position=i+1; Symbol=s; Price=nth(i,previous.Price); Shares=nth(i,previous.Shares) ], upsert);
          i := i + 1;
        }
        while (i < 3) {
          output setOpcode([ Position=i+1; Symbol=s], safedelete);
          i := i + 1;
        }
      }

    1. The first two lines assign local variables "i" to 0 and "s" to
      the current event's Symbol field.
    2. The first "while" loop walks over the group associated with the
      Symbol of the current event. It creates at most three new events, marked as upserts, getting the highest price first, the second highest second, and so forth.
    3. The second "while" loop ensures that, if there are not three entries in the group, the output doesn't have more than three events. The loop handles a corner case when there are deletes in the Bid stream. The full data model in the Aleri Streaming Platform wraps the above code in a FlexStream.
  • Using the XML Data Type for Atomic Changes

    The Aleri Streaming Platform allows you to manipulate structured XML data like most databases. A value of "xml" type is a tree structure like

    <Address>
       <Name value="George Bush"/>
       <Street value="100 Dusty Road"/>
       <City value="Crawford"/>
       <State value="Texas"/>
    </Address>

    The operations are a subset of the SQL/XML standard. You create variables holding xml values and assign them to values such as

    xml x1 := xmlparse('<element/>');

    and convert them back and forth to strings.

    Here's an example model and small data set that demonstrates one use of the xml data type: creating joins that propagate changes in an atomic way. It takes in events with fields

    string department
    string manager

    and events with fields

    string department
    string member

    outputs structures like

    <department name="Software Principles">
       <member name="Elsa Gunter"/>
       <member name="Amy Felty"/>
       <member name="John Reppy"/>
    </department>

    If you look at the model in the Aleri Studio, you'll see a diagram

    XML Structure

    with two source streams "departments" and "members", a Join Stream "fullDept" with fields

    string department
    string member
    string manager

    an Aggregate Stream "groupDept" with fields

    string department
    string xmlStructure

    and a similar Aggregate Stream "groupMgr" stream. These aggregations make the changes from the Join Stream atomic. The above structure will be altered whenever a new department is added or a member of the department changes. Those changes are harder to see from the Join Stream alone. For example, if a member of a department changes, the Join Stream will only output the changed row instead of the full department.

    It's instructive to look at the computation inside "groupDept". The expression for computing the above structure is

    xmlserialize(xmlelement(department, xmlattributes(fullDept.department as "name"), xmlagg(xmlelement(member, xmlattributes(fullDept.member as "name")))))

    The important operation is the "xmlagg" function, which aggregates a number of xml values into one xml value. More precisely, each "member" element from the "fullDept" stream is copied into the structure, which is wrapped in a "department" tag at the top level.

    Future releases will have more functionality added to the xml data type, such as searching and replacing elements using XPath expressions.

  • Filtering Events Based on Event Attributes

    We begin our exploration of CEP design patterns with basic filtering. While filtering is easy to implement with a number of non-CEP products, or with custom-built applications, this example will help us set up a framework for describing other, more complex patterns.
    Filtering Pic_0.gif
    (click image to view larger)

    The above diagram depicts a simple filter query. The query subscribes to one stream, evaluates a specified logical condition based on event attributes, and, if the condition is true, publishes the event to the destination stream. For example, an application monitoring a stream of purchase orders may filter out all orders where the condition is Priority != ‘High’ and Amount < 100000.

    This example presents the simplest kind of filter, where events are evaluated one by one, and where the query condition only involves the attributes of one event. It is also possible to construct many other more complex filters, for example, filters that compare events to other events in the same stream, or in another stream, or compare events to a computed metric. For instance, a filter might capture orders where the purchase amount is larger than the previous purchase amount, or purchase amounts that are larger than the average for the previous day. Such relatively more complex queries are discussed later in this document.

    Sample Areas of Applicability

    Filtering is ubiquitous in CEP applications. Here are some examples:

    • Trading: a filter may be used to filter out all trades where the volume is too small, or all trades that do not refer to particular stock symbols.
    • Click-stream analysis: a filter may be used to capture the trades that originate from a certain set of IP addresses.
    • Sensor network: a filter may be used to capture sensor readings where values fall outside of the normal range.

    CCL Examples

    Filter queries in CCL look much like SQL queries. Unlike SQL queries, however, CCL queries execute continuously. The query shown below subscribes to StreamAllPOs, continuously evaluates the condition for all events, and sends those events that match the condition to StreamImportantPOs.

    INSERT INTO StreamImportantPOs 
    SELECT * 
    FROM StreamAllPOs 
    WHERE PRIORITY = 'High’ OR Amount >= 100000;
    
  • Caching and Accessing Streaming and Database Data in Memory

    This is the first of the design patterns considered in this document, where multiple events are kept in memory. In-memory data caching is the foundation of most CEP design patterns. The cache typically stores two kinds of data:

    • Recent events from one or more streams
      Recent events are typically stored in windows. A window is an object, similar to an in-memory database table. However, a window can manage its state automatically, by keeping and evicting certain events according to its policy. For example a window policy might specify: KEEP 1000 ROWS PER Id. This window maintains 1000 rows for each ID value, and expires old rows, as necessary.
    • Data from one or more database tables
      Just as streaming events can be cached in memory, it often makes sense to cache data from a relational database, so that different kinds of operations may be performed on this data more efficiently. This cache is typically managed according to the Least Recently Used (LRU) algorithm, or by explicit invalidation.

    In-Memory Caching_0.gif
    (Click Image to Enlarge)

    Note that, although we are describing an in-memory cache here, many applications require this cache to be persistent. This means that, if a machine that hosts the CEP engine fails, the data kept in windows is not lost. This functionality is even more important when the window holds not just the last few seconds’, but minutes’, hours’, days’, and even weeks’ worth of events.

    Sample Areas of Applicability

    In-memory caching is used in every non-trivial CEP application. Here are just a few examples:

    • Trading: in a trading application, the cache may hold the values of recent trades, recent orders, or recent news events, coupled with the relevant historical and reference information.
    • Click-stream analysis: a typical application may hold the recent clicks and searches performed by the users, coupled with the relevant historical and reference information.
    • Network security: A typical application may hold recent events from firewalls, intrusion detection systems, and other devices, coupled with the relevant historical and reference information.

    CCL Examples

    In this example, we will focus on the first form of caching, namely, caching events from a data stream. The Database Lookup design pattern, presented later in this document, discusses the caching of database data.

    Windows

    -- A window to keep data for ten minutes, populated by stream StreamClicks

    CREATE WINDOW RecentHistory(IPAddress LONG, URLvisited STRING, FileSize LONG)
    KEEP 10 MINUTES;

    INSERT INTO RecentHistory
    SELECT *
    FROM StreamClicks;

    -- A window to keep the last value for each IPAddress:

    CREATE WINDOW LastValues(IPAddress LONG, URLvisited STRING, FileSize LONG)
    KEEP LAST PER IPAddress;

    -- A window that keeps the 1000 largest downloads. Each download is kept for one hour, or until a larger download displaces it from the window:

    CREATE WINDOW LargestDownloads(IPAddress LONG, URLvisited STRING, FileSize LONG)
    KEEP 1000 LARGEST BY FileSize KEEP 1 HOUR;

    Looking up Data in a Window

    -- Use the RecentHistory window defined above to check how many other records with
    -- the same IP address have been seen recently

    INSERT INTO StreamNewIP
    SELECT S.*, Count(H)
    FROM StreamClicks AS S,
        RecentHistory as H
    WHERE S.IP = H.IP

  • Computing Statistical Metrics over Various Kinds of Moving Windows

    Unlike the previous design pattern, this pattern does not merely keep events in memory, but uses the stored values to compute various statistics. A typical example here would involve computing a running average over a sliding window. (As we have seen in the previous example, a window is an object that holds a set of events in memory):
    Aggregation Over Windows_0.gif
    (Click Image to Enlarge)

    This design pattern comes in quite a few flavors, differing along the following dimensions:

    • The kinds of aggregators computed
      These include running averages, sums, counts, minimum, maximum, standard deviation, user-defined aggregators, and so on.
    • The kinds of windows used
      These include time-based and count-based windows, sliding and jumping (tumbling) windows, windows that keep the specified number of largest or smallest elements, and so on.
    • Output frequency: continuous vs. periodic
      In the case of continuous output (also called “tick-by-tick” output) each incoming event updates the calculated expression, and an output event is produced. With periodic output, the calculated expression is updated continuously, but is published only periodically, for example, every ten seconds. Note that, in both cases, the expression is computed incrementally, that is, the entire window is not rescanned on each incoming event.

    Sample Areas of Applicability

    Windows-based computations are used in a wide variety of applications. For example:

      Trading: it is often necessary to compute “one minute bars” the average, maximum, minimum, and/or closing price within each one-minute interval.
    • Click-stream analysis: it is often useful to compute the number of visitors who click on a particular link within a specified time interval.
    • System management: applications may compute maximum and minimum CPU usage, memory, and Disk I/O utilization for each machine, within a specified time interval.

    CCL Examples

    -- Compute one-minute “bars” (Avg, Max, Min, Closing) and VWAP (Volume-Weighted
    -- Average Price). Output results continuously, as soon as results change.

    INSERT INTO OneMinuteBarView
    SELECT Symbol, AVG(Price), MAX(Price), MIN(Price), SUM(Price*Volume)/SUM(Volume)
    FROM StreamFeed KEEP 1 MINUTE;

    -- Determine the total bandwidth consumed by each IP address for the
    -- last 1000 downloads. Output results every 15 second.

    INSERT INTO StreamConsumedBandwidth
    SELECT IP, SUM(FileSize)
    FROM StreamDownloadLog KEEP 1000 ROWS
    GROUP BY IP
    OUTPUT EVERY 15 SECONDS;

  • Accessing Databases to Retrieve Historical or Reference Context for Incoming Events

    While there are applications that deal exclusively with real-time events, most useful applications refer to historical data or reference data to enrich the incoming events. The following diagram shows how:

    • An event comes into the system.
    • The engine issues an SQL request to the database and passes a key (from the event) as a parameter to the database query.
    • The database returns a result.
    • The engine combines the result with data from the event, and forwards the enriched event to the next query for further processing.

    Database Lookups_0.gif
    (Click Image to View Larger)

    In SQL terminology, this design pattern implements a join between an incoming data stream and a database table.

    The more advanced implementations of this design pattern often require:

    • Caching
      A high throughput is difficult to achieve if the CEP engine must refer to the database on every incoming event. Thus, a good caching layer is critical to performance.
    • Granular caching and access
      Sometimes it is acceptable to cache entire database tables in memory. Such a paradigm works for small database tables, but does not scale well if many large tables need to be accessed. For example, if an RFID application is trying to look up information about a specific RFID tag, caching the entire database table is often not possible. Accessing and caching specific rows is required for these applications.
    • Concurrent lookups
      For some applications, it is acceptable to issue serial calls to the database, blocking the processing of events until the database returns results. For others, concurrent lookups that do not block the entire system are a must.

    Sample Areas of Applicability

    This design pattern is widely applicable. For example:

    • Trading: a trading application may look up historical price for a stock, or certain information about an order, or certain rules and regulations stored in a database.
    • RFID application: an application may look up information about a palette or case, identified by its tag ID, or information about the reader that reported the tag. An application may also check where the object should be located, according to the plan stored in the database, and compare this location to the actual location of the object.
    • Network security: when deciding how serious an alert is, it may be necessary to refer to other alerts related to the same IP address.

    CCL Examples

    In these examples, the SQL lookup code (delimited by [[ ... ]]) is executed in the database, not in the Coral8 Engine.


    -- Look up the SKU and the name of the product for each RFID event, via the Tag ID.
    INSERT INTO
    StreamRFIDEventsEnriched
    SELECT
    StreamRFIDEvents.TagID, DbResult.SKU, DbResult.ProductName
    FROM StreamRFIDEvents,
    (DATABASE "OracleDb"
    SCHEMA "schemas/product_info.ccs"
    -- SQL Code to perform lookup:
    [[SELECT sku, product_name
    FROM product_info pi
    WHERE pi.tag_id = StreamRFIDEvents.TagID]]) AS DbResult;

    -- Look up yesterday’s closing prices for each incoming trade.

    INSERT INTO
    StreamTradesEnriched
    SELECT
    InTrades.Symbol, InTrades.Price, DbResult.ClosingPrice
    FROM StreamTrades,
    (DATABASE "OracleDb"
    SCHEMA "schemas/closing-price.ccs"
    -- SQL Code to perform lookup:
    [[SELECT closing_price
    FROM price_history ph
    WHERE ph.symbol = ?InTrades.Symbol
    AND ph.closing_date > current_date-2
    AND ph.closing_date <= current_date-1]]
    ) as DbResult

  • Sending Raw or Derived Events to a Relational Database

    While the CEP Engine can store large volumes of events in windows, it is often necessary to write raw or processed (filtered, aggregated, correlated, and so on) events into a traditional relational database. A relational database can manage very large volumes of data for very long periods of time, and it also supports a number of interfaces that other applications can use to retrieve the data. This design pattern illustrates the complementary nature of databases and CEP engines.
    Database Writes_0.gif
    Click Image to View Larger)

    Note that, if the database must store large volumes of events, this design pattern may call for a number of advanced techniques, such as batching, asynchronous writing (to avoid blocking), queuing (to handle spikes), concurrent writes, writing via native database interfaces, and so on.

    Sample Areas of Applicability

    This design pattern cuts across a wide range of applications, such as:

    • Trading: writing 1 minute bars (maximums, minimums and the closing price for each one-minute interval) into the database.
    • Click-stream analysis: storing the raw click-stream history, together with derived data, in the database.
    • Network security: storing new relevant security events in the database.

    CCL Examples

    -- Store aggregated information, calculated every minute, based on a 10-minute
    -- sliding window, into an Oracle database table called one_minute_summary.

    EXECUTE SATEMENT
        DATABASE “MyOracleDb”
    [[
        -- SQL code to perform an INSERT. Can be SQL or PL/SQL or T-SQL
        INSERT INTO one_minute_summary
        VALUES(?symbool, ?closing_price, ?avg_price, ?max_price, ?min_price, ?vwap);
    ]]
    SELECT Symbol AS symbol,
        Price AS closing_price,
        AVG(Price) AS avg_price,
        MAX(Price) AS max_price,
        MIN(Price) AS min_price,
        SUM(Price*Volume)/SUM(Volume) AS vwap
    FROM StreamFeed KEEP 10 MINUTE
    OUTPUT EVERY 1 MINUTE;

  • Joining Multiple Event Streams

    While simple applications often look at just one stream at a time, most advanced applications must look at and correlate events across multiple streams. A join in a CEP application shares many characteristics with a join in SQL.
    Correlations (Joins)_0.gif
    (Click Image to View Larger)

    In CEP, a join between two data streams necessarily involves one or more windows. Streams do not store events, but pass events to, from, and between queries. To perform a join, it is necessary to store some events in memory, to wait for events on the corresponding stream. This is what a window does. The above diagram depicts a stream to window join. Events arriving in Stream 2 are stored in the window. Events arriving in Stream 1 are joined with events stored in the window, and the matching pairs are published by the join. See the CCL Examples section for specific examples.

    Other kinds of interesting CEP joins include:

    • Window to Window joins
      Joins where data from multiple streams is retained by the windows and is incrementally joined.
    • Outer joins
      These joins are similar to SQL outer joins, and are surprisingly useful in CEP applications. For example, when joining a stream with a window, it may be necessary to produce results, regardless of whether or not a match is found. An outer join is needed to perform this function.
    • Stream to database joins
      These joins relate event streams to data stored in a database. Such joins generalize the ideas presented in the Database Lookup design pattern.

    Even at low data rates, joins are very CPU-intensive. Thus, most CPE joins require heavy indexing. The indices used in CEP applications are similar to the ones used in relational databases, but there is at least one important difference: CEP indices must be highly dynamic. For example, if a window that stores the last 10 minutes’ worth of data is indexed, the index must be updated, both when new events enter the window, and when old events expire from the window. At 10,000 events per second, that’s 20,000 index updates per second! A good CEP engine creates proper indices for windows without requiring the user to specify them.

    Sample Areas of Applicability

    Most sophisticated CEP applications employ joins. Here are some examples:

    • Trading: correlating information from multiple exchanges to find arbitrage opportunities.
    • Business process monitoring: correlating information from multiple systems that participate in a business process to manage and track exceptions.
    • Network security: correlating information across different security devices and applications for sophisticated intrusion detection and response.

      CCL Examples

      -- Stream to Window Join: keep a 10 second window on stream StreamFeed2,
      -- and join all the events from StreamFeed1 with this window.

      INSERT INTO StreamMatches
      SELECT StreamFeed1.*, StreamFeeed2.*
      FROM StreamFeed1,
          StreamFeed2 KEEP 10 SECONDS
      WHERE StreamFeed1.Symbol = StreamFeed2.Symbol;

      -- Same as above, but perform a left outer join: produces outputs
      -- (containing some NULLS) even if no matches are found.

      INSERT INTO StreamMatches
      SELECT StreamFeed1.*, StreamFeeed2.*
      FROM
          StreamFeed1
          LEFT OUTER JOIN
          StreamFeed2 KEEP 10 SECONDS
      ON
          StreamFeed1.Symbol = StreamFeed2.Symbol;

      -- Window to Window join: keep windows on both streams. This example correlates alerts from a Virus Checker System and an Intrusion Detection System.

      INSERT INTO StreamAlertCommonIP
      SELECT StreamIDSAlerts.IP
      FROM
         StreamIDSAlerts KEEP 10 MINUTES,
         StreamVirusCheckAlerts KEEP 10 MINUTES,
      WHERE
          StreamIDSAlerts.IP = StreamVirusCheckAlerts.IP;