Open IT Experts for Enterprise

Zylk empresa de desarrollo de ecommerce

Flink procesado de streams ii

Gustavo Fernández
Gustavo Fernández
Blog Gus Flink

Aplicando una regresion lineal a un stream

Siguiendo con el uso de ventanas
en flink
,  vamos a modelar la generación de eventos para poder
detectar que un stream concreto se está desviando de su funcionamiento
normal. Por ejemplo vamos suponer que tenemos un stream que emite un
evento cada vez que una persona añade un artículo a un carrito de la
compra en una página web, y queremos detectar posibles problemas en el
proceso. Para ello podemos hacer lo siguiente:

  • Consumir dicho stream desde flink
  • Crear una ventana que nos sume el número de eventos que se
    producen en un perido de tiempo determinado (1 segundo por ejemplo)
  • Crear una segunda venta que nos calcule la regresión lienal de los
    últimos n-1 elementos y compare la predicción con el elemento n
  • Emitir un nuevo stream que devuelva true si los valores obtenidos
    están dentro del rango esperado y false si no lo están

Con esto tendremos un sistema, claramente invalido para producción
por su simplicidad, que nos permite detectar problemas en el sistema original.

Para resolver esto usando flink, bastat con implementar la siguiente
clase java. Para la parte de la reegresión lienal hemos usado la
libería math
de apache

package net.zylk.flink.window;

import java.util.concurrent.TimeUnit;

import org.apache.commons.math3.stat.regression.SimpleRegression;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

import net.zylk.flink.window.trigger.CountTimeTrigger;

public class NumberOfElementsGroupByKey {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        NumberOfElementsGroupByKey.addJob(env);
        env.execute();
        
        env.execute("Trace :: Kafka2HBase Flink Job");
    }

    public static void addJob(StreamExecutionEnvironment env) throws Exception {
        // set up the streaming execution environment

        Integer MEASURE_PERIOD = 1; //Numero de segundos para la agrupar la medida
        Integer NUMBER_OF_POINTS = 10; // Numero de elementos a tener en cuenta en la extrapolacion
        Double MIN_CORRELATION = 0.98; // Correlacion mínima necesaria para considerar que las medidas pueden genera una preducción útil
        
        DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
        //devuelve el máximo de los tres números que entren o el máximo en 10 segundos
        dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).trigger(CountTimeTrigger.of(3)).max(0).print();

        
        
        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream =  dataStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String message) throws Exception {
                if (message != null)
                    return true;
                else
                    return false;
            }
        }).name("Filter by service and app").map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String,Integer> map(String value) throws Exception {
                return new Tuple2<String, Integer>(value,1);
            }
        }).keyBy(0);
        
        
        
    
        keyedStream.timeWindow(Time.of(MEASURE_PERIOD, TimeUnit.SECONDS)).sum(2).keyBy(0,1).countWindow(NUMBER_OF_POINTS,1).apply(new WindowFunction<Tuple2<String,Integer>, Boolean, Tuple, GlobalWindow>() {
            @Override
            public void apply(Tuple key, GlobalWindow window, Iterable<Tuple2<String, Integer>> in, Collector<Boolean> out) throws Exception {
                int n = 0;
                Boolean a  = true;
                SimpleRegression regression = new SimpleRegression();
                for (Tuple2<String, Integer> t : in) {
                    if(n < NUMBER_OF_POINTS -1 ) {
                        regression.addData(new Double(n).doubleValue(),new Double(t.f1).doubleValue());
                    } else {
                        if(regression.getR() > MIN_CORRELATION) {
                            double p = regression.predict(new Double(n).doubleValue());
                            a = (p - regression.getSumSquaredErrors()) > t.f1 && t.f1 < (p + regression.getSumSquaredErrors()); // si la medida está en el intervalo de la prediccion..
                        }
                    }
                    n++;
                }
                out.collect(a);
            }
        });
    }
}

Este ejemplo no prentende ser un ejemplo de modelado predictivo, flink
dispone de sus propias librerías para ML
, solo pretender
ilustrar el uso de las ventanas en flink.

Si te ha parecido interesante comparte este post en RRS

Facebook
LinkedIn
Telegram
Email

Leer más sobre temas relacionados

Deja un comentario

Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *