Flink procesado de streams i

Procesado streams con ventanas personalizadas usando Flink

En un proyecto, en el que estamos involucrados y en el que estamos usando flink para el procesado de streams, hemos tenido que investigar como desarrollar una ventana de procesado personalizada. En este caso una ventana muy sencilla que consiste en disparar los eventos bajo dos circuntancias

  • Cuando han entrado un número de eventos concreto
  • Cuando ha pasado un tiempo determinado

En realidad es simplemente una ventan que conjuga las dos ventanas típicas que flink ofrece por defecto. Para hacer esto basta con desarrollar el siguiente trigger de flink

package net.zylk.flink.window.trigger;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
public class CountTimeTrigger<W extends Window> extends Trigger<Object, W> {
    private static final long serialVersionUID = 1L;

    private final long maxCount;

    private final ReducingStateDescriptor<Long> stateDesc =
            new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);


    private CountTimeTrigger(long maxCount) {
        this.maxCount = maxCount;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        //actualizo la ventana de tiempo
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE_AND_PURGE; //porque no quiero que los vuelva a procesar y así no tengo que hacer un trigger tipo purge (nested)
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE;
    }

    @Override
    public void clear(W window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
        ctx.getPartitionedState(stateDesc).clear();
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
        ctx.mergePartitionedState(stateDesc);
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    @Override
    public String toString() {
        return "CountTimeTrigger(" +  maxCount + ")";
    }

    /**
     * Creates a trigger that fires once the number of elements in a pane reaches the given count.
     *
     * @param maxCount The count of elements at which to fire.
     * @param <W> The type of {@link Window Windows} on which this trigger can operate.
     */
    public static <W extends Window> CountTimeTrigger<W> of(long maxCount) {
        return new CountTimeTrigger<>(maxCount);
    }

    private static class Sum implements ReduceFunction<Long> {
        private static final long serialVersionUID = 1L;

        @Override
        public Long reduce(Long value1, Long value2) throws Exception {
            return value1 + value2;
        }

    }
}

Y usarlo de la siguiente manera desde flink

package net.zylk.flink.window;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import net.zylk.flink.window.trigger.CountTimeTrigger;

public class CountTimeWindowJob {

    public static void main(String[] args) throws Exception {
        //nc -lk 9999 comando de unix que levanta el servidor para mandar info
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.socketTextStream("localhost", 9999);
        //devuelve el máximo de los tres números que entren o el máximo en 10 segundos
        dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).trigger(CountTimeTrigger.of(3)).max(0).print();
        env.execute();
    }
    
}

 

00

Más entradas de blog

thumbnail

Añadir comentarios