Aplicando una regresion lineal a un stream
Siguiendo con el uso de ventanas
en flink, vamos a modelar la generación de eventos para poder
detectar que un stream concreto se está desviando de su funcionamiento
normal. Por ejemplo vamos suponer que tenemos un stream que emite un
evento cada vez que una persona añade un artículo a un carrito de la
compra en una página web, y queremos detectar posibles problemas en el
proceso. Para ello podemos hacer lo siguiente:
- Consumir dicho stream desde flink
- Crear una ventana que nos sume el número de eventos que se
producen en un perido de tiempo determinado (1 segundo por ejemplo) - Crear una segunda venta que nos calcule la regresión lienal de los
últimos n-1 elementos y compare la predicción con el elemento n - Emitir un nuevo stream que devuelva true si los valores obtenidos
están dentro del rango esperado y false si no lo están
Con esto tendremos un sistema, claramente invalido para producción
por su simplicidad, que nos permite detectar problemas en el sistema original.
Para resolver esto usando flink, bastat con implementar la siguiente
clase java. Para la parte de la reegresión lienal hemos usado la
libería math
de apache
package net.zylk.flink.window; import java.util.concurrent.TimeUnit; import org.apache.commons.math3.stat.regression.SimpleRegression; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.util.Collector; import net.zylk.flink.window.trigger.CountTimeTrigger; public class NumberOfElementsGroupByKey { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); NumberOfElementsGroupByKey.addJob(env); env.execute(); env.execute("Trace :: Kafka2HBase Flink Job"); } public static void addJob(StreamExecutionEnvironment env) throws Exception { // set up the streaming execution environment Integer MEASURE_PERIOD = 1; //Numero de segundos para la agrupar la medida Integer NUMBER_OF_POINTS = 10; // Numero de elementos a tener en cuenta en la extrapolacion Double MIN_CORRELATION = 0.98; // Correlacion mínima necesaria para considerar que las medidas pueden genera una preducción útil DataStream<String> dataStream = env.socketTextStream("localhost", 9999); //devuelve el máximo de los tres números que entren o el máximo en 10 segundos dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).trigger(CountTimeTrigger.of(3)).max(0).print(); KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = dataStream.filter(new FilterFunction<String>() { @Override public boolean filter(String message) throws Exception { if (message != null) return true; else return false; } }).name("Filter by service and app").map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String,Integer> map(String value) throws Exception { return new Tuple2<String, Integer>(value,1); } }).keyBy(0); keyedStream.timeWindow(Time.of(MEASURE_PERIOD, TimeUnit.SECONDS)).sum(2).keyBy(0,1).countWindow(NUMBER_OF_POINTS,1).apply(new WindowFunction<Tuple2<String,Integer>, Boolean, Tuple, GlobalWindow>() { @Override public void apply(Tuple key, GlobalWindow window, Iterable<Tuple2<String, Integer>> in, Collector<Boolean> out) throws Exception { int n = 0; Boolean a = true; SimpleRegression regression = new SimpleRegression(); for (Tuple2<String, Integer> t : in) { if(n < NUMBER_OF_POINTS -1 ) { regression.addData(new Double(n).doubleValue(),new Double(t.f1).doubleValue()); } else { if(regression.getR() > MIN_CORRELATION) { double p = regression.predict(new Double(n).doubleValue()); a = (p - regression.getSumSquaredErrors()) > t.f1 && t.f1 < (p + regression.getSumSquaredErrors()); // si la medida está en el intervalo de la prediccion.. } } n++; } out.collect(a); } }); } }
Este ejemplo no prentende ser un ejemplo de modelado predictivo, flink
dispone de sus propias librerías para ML, solo pretender
ilustrar el uso de las ventanas en flink.