Let’s take a look at the simple POJO market data event:
public class MarketDataEvent {
private String symbol;
private BigDecimal price;
//getters and setters
}
All we need for the stock price ticker is the stock symbol and the price. As we already mentioned, in order to keep the precision of the price, the price property will be of type BigDecimal.
To sent the MarketDataEven objects to Esper engine we are going to use EsperTemplate – component of the OpenCredo Esper Extension open source project with the goal to simplify configuration and usage of Esper CEP engine. You can explore and download OpenCredo Esper Extension here. First we need to make sure we have the Esper XML configuration file on the project classpath. The basic empty Esper configuration will look something like this:
<esper-configuration xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.espertech.com/schema/esper"
xsi:schemaLocation="http://www.espertech.com/schema/esper
http://www.espertech.com/schema/esper/esper-configuration-4.5.xsd">
</esper-configuration>
Once we instantiated EsperTemplate, we will add an EPL statement we want to process as well as UpdateListener which will get notified whenever statement yields any results. Here is the code that we need to implement to achieve all that:
EsperTemplate esperTemplate = new EsperTemplate(); #1
esperTemplate.setConfiguration(new ClassPathResource("/esper-configuration.xml")); #2
EsperStatement statement = new EsperStatement( #3
"select symbol,avg(price) as avgPrice
from
com.opencredo.sandbox.aleksav.esper.domain.MarketDataEvent.win:time(30 second)
group by symbol");
SimpleListener listener = new SimpleListener();
statement.addListener(listener); #4
esperTemplate.addStatement(statement); #5
esperTemplate.initialize(); #6
#1 Instantiate EsperTemplate#2 Specify the Esper configuration XML file#3 Add an EPL statement that will calculate average price per symbol during last 30 seconds of the MarketDataEvent even stream#4 Add Esper listener which will be notified whenever statement produces results#5 Add statement to the instantiated EsperTemplate#6 Finally, initially EsperTemplate and the underlying Esper runtime Output from the EPL statement registered above will be series of events with stock symbol and the average of that stock price during last 30 seconds of trading. Whenever the statement matches new event entering or exiting the window, it will trigger execution and notify the configured listener about the change. Let’s take a look at our listener code:
public class SimpleListener implements UpdateListener{ #1
private BigDecimal averagePrice; #2
@Override
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
if(newEvents.length > 0){
this.averagePrice =
new BigDecimal(newEvents[newEvents.length-1].get("avgPrice").toString()); #3
}
}
public BigDecimal getAveragePrice() {
return averagePrice;
}
}
#1 We are implementing Esper’s UpdateListener#2 We store the last calculated average price as field member#3 We set the average price property to the calculated value of the last execution event Because of the asynchronous nature of event stream processing with Esper, listener can be invoked with the number of statement executions at the same time. Since we’re only interested in the latest average price, we only interrogate the last of the passed events from the array of EventBeans passed as the method argument (#3).So let’s now write a test to demonstrate the work of Esper engine with configured statement and its listener.
public class AllRoundIntegrationTest {
EsperTemplate esperTemplate;
SimpleListener listener = new SimpleListener();
@Before
public void setup() { #1
esperTemplate = new EsperTemplate();
esperTemplate.setConfiguration(
new ClassPathResource("/esper-configuration.xml"));
EsperStatement statement =
new EsperStatement(
"select symbol,avg(price) as avgPrice
from com.opencredo.sandbox.aleksav.esper.domain.MarketDataEvent.win:time(30 second)
group by symbol");
statement.addListener(listener);
esperTemplate.addStatement(statement);
esperTemplate.initialize();
}
@Test
public void testIndefinitePrecision() throws InterruptedException { #2
esperTemplate.sendEvent(new MarketDataEvent("ACME", new BigDecimal(5.0))); #3
esperTemplate.sendEvent(new MarketDataEvent("ACME", new BigDecimal(3.0)));
esperTemplate.sendEvent(new MarketDataEvent("ACME", new BigDecimal(2.0)));
Thread.sleep(7000);
assertEquals("Must have correct average price", #4
new BigDecimal("3.33"),
listener.getAveragePrice());
}
#1 – Setup Esper runtime and EsperTemplate#2 – Test method#3 – Push 4 POJO events to Esper engine#4 – Assert that the average is calculated correctly Using JUnit’s @Before annotation we setup the Esper environment, register statement and listener all in the setup() method (#1) before implementing the test case (#2). We send 4 events to Esper, all for the same stock symbol (ACME), but with different values. With the help of the standard calculator, we determined that the expected average should be (5.0+3.0+2.0)/3 = 3.333333…. and that’s what we try assert for in the test (#4).We run the test now, and instead of green pass, we get test failure:
java.lang.ArithmeticException: Non-terminating decimal expansion; no exact representable decimal result.
at java.math.BigDecimal.divide(BigDecimal.java:1603)
at com.espertech.esper.epl.agg.BigDecimalAvgAggregator.getValue(BigDecimalAvgAggregator.java:80)
at com.espertech.esper.epl.agg.AggSvcGroupByRefcountedNoAccessImpl.getValue(AggSvcGroupByRefcountedNoAccessImpl.java:148)
How can we explain this? The result of the BigDecimal.divide() method must be representable as a decimal result. In our case 3.33333…. has indefinite number of digits after decimal space and it cannot be represented as a decimal digit without rounding – so the ArithmeticException is thrown. Unfortunatelly Esper avg aggregation function for BigDecimal values does not take this into account, so cannot make Esper round the result of the average calculation.
What we can do is implement our own, custom aggregation function and register it with Esper engine. All we need to do is to extend the abstract com.espertech.esper.epl.agg.AggregationSupport class and implement the missing methods. Here is how our own BigDecimalRoundingAverageAggregator class would look like:
public class BigDecimalRoundingAverageAggregator extends AggregationSupport {
private BigDecimal sum = BigDecimal.ZERO;
private long numDataPoints;
private int scale = 2; #1
private RoundingMode roundingMode = RoundingMode.HALF_EVEN; #2
public void clear() {
sum = BigDecimal.ZERO;
numDataPoints = 0;
}
public void enter(Object object) { #3
if (object == null) {
return;
}
numDataPoints++;
if (object instanceof BigDecimal) {
sum = sum.add((BigDecimal) object);
} else if (object instanceof Number) {
sum = sum.add(new BigDecimal(((Number) object).doubleValue()));
}else{
throw new RuntimeException("Must be a number");
}
}
public void leave(Object object) { #4
if (object == null) {
return;
}
numDataPoints--;
if (object instanceof BigDecimal) {
sum = sum.add((BigDecimal) object);
} else if (object instanceof Number) {
sum = sum.add(new BigDecimal(((Number) object).doubleValue()));
}else{
throw new RuntimeException("Must be a number");
}
}
public Object getValue() {
if (numDataPoints == 0) {
return null;
}
return sum.divide(new BigDecimal(numDataPoints), scale, roundingMode); #5
}
public Class getValueType() {
return BigDecimal.class;
}
@Override
public void validate(AggregationValidationContext validationContext) { #6
for (Class clazz : validationContext.getParameterTypes()) {
if (!clazz.isAssignableFrom(BigDecimal.class) && !clazz.isAssignableFrom(Number.class)) {
throw new RuntimeException("Argument must be either BigDecimal or Number");
}
}
}
}
#1 Set scale to use for result rounding#2 Set rounding mode to use for result rounding#3 enter() method is invoked whenever new event enters the time window#4 leave() method is invoked whenever event leaves the time window (expires)#5 Perform division using provided scale and rounding mode#6 validate() method checks whether argument type match the aggregator allowed types at EPL statement compile time The key improvement on the Esper’s original big decimal aggregator, is that the final division to calculate average is rounding using configured scale and rounding mode (#5). In addition to that, our implementation handles mixing BigDecimal and any Number instances like Long, Double, Integer… (#3, #4)Final step is to map this aggregation function implementation with the function name to use in EPL statements. To achieve that all we have to do is add the mapping to the Esper configuration XML file:
<plugin-aggregation-function name="avgRound"
function-class="com.opencredo.sandbox.aleksav.esper.agg.BigDecimalRoundingAverageAggregator" />
We cannot overrride existing function names, so we gave our new function name avgRound. Before we run the test we need to change the function name in the EPL statement:
EsperStatement statement =
new EsperStatement(
"select symbol,avgRound(price) as avgPrice
from com.opencredo.sandbox.aleksav.esper.domain.MarketDataEvent.win:time(30 second)
group by symbol");
If you rerun the test, you will see the calming green bar in your IDE, with “All tests passed” message.We made a quick fix for an annoying problem by adding custom aggregation function to Esper runtime in no time. Interesting thing to note is that EPL keywords used within the custom aggregation function will be processed and applied automatically by Esper, so for example we can do the following:select symbol, avgRound(distict price) from MarketDataEventand the distinct keyword will make sure only unique prices would be fed into avgRound function.
Aggregation extensions are very powerful feature of Esper. If used wisely, it enables you do implement even the most complex scenarios for complex event processing.
This blog is written exclusively by the OpenCredo team. We do not accept external contributions.