Flink procesado de streams ii

Aplicando una regresion lineal a un stream

Siguiendo con el uso de ventanas en flink,  vamos a modelar la generación de eventos para poder detectar que un stream concreto se está desviando de su funcionamiento normal. Por ejemplo vamos suponer que tenemos un stream que emite un evento cada vez que una persona añade un artículo a un carrito de la compra en una página web, y queremos detectar posibles problemas en el proceso. Para ello podemos hacer lo siguiente:

  • Consumir dicho stream desde flink
  • Crear una ventana que nos sume el número de eventos que se producen en un perido de tiempo determinado (1 segundo por ejemplo)
  • Crear una segunda venta que nos calcule la regresión lienal de los últimos n-1 elementos y compare la predicción con el elemento n
  • Emitir un nuevo stream que devuelva true si los valores obtenidos están dentro del rango esperado y false si no lo están

Con esto tendremos un sistema, claramente invalido para producción por su simplicidad, que nos permite detectar problemas en el sistema original.

 

Para resolver esto usando flink, bastat con implementar la siguiente clase java. Para la parte de la reegresión lienal hemos usado la libería math de apache

package net.zylk.flink.window;

import java.util.concurrent.TimeUnit;

import org.apache.commons.math3.stat.regression.SimpleRegression;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import net.zylk.flink.window.trigger.CountTimeTrigger;

public class NumberOfElementsGroupByKey {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        NumberOfElementsGroupByKey.addJob(env);
        env.execute();
        
        env.execute("Trace :: Kafka2HBase Flink Job");
    }

    public static void addJob(StreamExecutionEnvironment env) throws Exception {
        // set up the streaming execution environment

        Integer MEASURE_PERIOD = 1; //Numero de segundos para la agrupar la medida
        Integer NUMBER_OF_POINTS = 10; // Numero de elementos a tener en cuenta en la extrapolacion
        Double MIN_CORRELATION = 0.98; // Correlacion mínima necesaria para considerar que las medidas pueden genera una preducción útil
        
        DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
        //devuelve el máximo de los tres números que entren o el máximo en 10 segundos
        dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).trigger(CountTimeTrigger.of(3)).max(0).print();

        
        
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream =  dataStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String message) throws Exception {
                if (message != null)
                    return true;
                else
                    return false;
            }
        }).name("Filter by service and app").map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String,Integer> map(String value) throws Exception {
                return new Tuple2<String, Integer>(value,1);
            }
        }).keyBy(0);
        
        
        
    
        keyedStream.timeWindow(Time.of(MEASURE_PERIOD, TimeUnit.SECONDS)).sum(2).keyBy(0,1).countWindow(NUMBER_OF_POINTS,1).apply(new WindowFunction<Tuple2<String,Integer>, Boolean, Tuple, GlobalWindow>() {
            @Override
            public void apply(Tuple key, GlobalWindow window, Iterable<Tuple2<String, Integer>> in, Collector<Boolean> out) throws Exception {
                int n = 0;
                Boolean a  = true;
                SimpleRegression regression = new SimpleRegression();
                for (Tuple2<String, Integer> t : in) {
                    if(n < NUMBER_OF_POINTS -1 ) {
                        regression.addData(new Double(n).doubleValue(),new Double(t.f1).doubleValue());
                    } else {
                        if(regression.getR() > MIN_CORRELATION) {
                            double p = regression.predict(new Double(n).doubleValue());
                            a = (p - regression.getSumSquaredErrors()) > t.f1 && t.f1 < (p + regression.getSumSquaredErrors()); // si la medida está en el intervalo de la prediccion..
                        }
                    }
                    n++;
                }
                out.collect(a);
            }
        });
    }
}

 

Este ejemplo no prentende ser un ejemplo de modelado predictivo, flink dispone de sus propias librerías para ML, solo pretender ilustrar el uso de las ventanas en flink.

00

Más entradas de blog

thumbnail

Añadir comentarios