Siguiendo con el uso de ventanas en flink, vamos a modelar la generación de eventos para poder detectar que un stream concreto se está desviando de su funcionamiento normal. Por ejemplo vamos suponer que tenemos un stream que emite un evento cada vez que una persona añade un artículo a un carrito de la compra en una página web, y queremos detectar posibles problemas en el proceso. Para ello podemos hacer lo siguiente:
Con esto tendremos un sistema, claramente invalido para producción por su simplicidad, que nos permite detectar problemas en el sistema original.
Para resolver esto usando flink, bastat con implementar la siguiente clase java. Para la parte de la reegresión lienal hemos usado la libería math de apache
package net.zylk.flink.window; import java.util.concurrent.TimeUnit; import org.apache.commons.math3.stat.regression.SimpleRegression; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector; import net.zylk.flink.window.trigger.CountTimeTrigger; public class NumberOfElementsGroupByKey { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); NumberOfElementsGroupByKey.addJob(env); env.execute(); env.execute("Trace :: Kafka2HBase Flink Job"); } public static void addJob(StreamExecutionEnvironment env) throws Exception { // set up the streaming execution environment Integer MEASURE_PERIOD = 1; //Numero de segundos para la agrupar la medida Integer NUMBER_OF_POINTS = 10; // Numero de elementos a tener en cuenta en la extrapolacion Double MIN_CORRELATION = 0.98; // Correlacion mínima necesaria para considerar que las medidas pueden genera una preducción útil DataStream<String> dataStream = env.socketTextStream("localhost", 9999); //devuelve el máximo de los tres números que entren o el máximo en 10 segundos dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).trigger(CountTimeTrigger.of(3)).max(0).print(); KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = dataStream.filter(new FilterFunction<String>() { @Override public boolean filter(String message) throws Exception { if (message != null) return true; else return false; } }).name("Filter by service and app").map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String,Integer> map(String value) throws Exception { return new Tuple2<String, Integer>(value,1); } }).keyBy(0); keyedStream.timeWindow(Time.of(MEASURE_PERIOD, TimeUnit.SECONDS)).sum(2).keyBy(0,1).countWindow(NUMBER_OF_POINTS,1).apply(new WindowFunction<Tuple2<String,Integer>, Boolean, Tuple, GlobalWindow>() { @Override public void apply(Tuple key, GlobalWindow window, Iterable<Tuple2<String, Integer>> in, Collector<Boolean> out) throws Exception { int n = 0; Boolean a = true; SimpleRegression regression = new SimpleRegression(); for (Tuple2<String, Integer> t : in) { if(n < NUMBER_OF_POINTS -1 ) { regression.addData(new Double(n).doubleValue(),new Double(t.f1).doubleValue()); } else { if(regression.getR() > MIN_CORRELATION) { double p = regression.predict(new Double(n).doubleValue()); a = (p - regression.getSumSquaredErrors()) > t.f1 && t.f1 < (p + regression.getSumSquaredErrors()); // si la medida está en el intervalo de la prediccion.. } } n++; } out.collect(a); } }); } }
Este ejemplo no prentende ser un ejemplo de modelado predictivo, flink dispone de sus propias librerías para ML, solo pretender ilustrar el uso de las ventanas en flink.