Procesado streams con ventanas personalizadas usando Flink
En un proyecto, en el que estamos involucrados y en el que estamos
usando flink para el procesado de streams, hemos tenido que investigar
como desarrollar una ventana de procesado personalizada. En este caso
una ventana muy sencilla que consiste en disparar los eventos bajo dos circuntancias
- Cuando han entrado un número de eventos concreto
- Cuando ha pasado un tiempo determinado
En realidad es simplemente una ventan que conjuga las dos ventanas
típicas que flink ofrece por defecto. Para hacer esto basta con
desarrollar el siguiente trigger de flink
package net.zylk.flink.window.trigger; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.Window; @PublicEvolving public class CountTimeTrigger<W extends Window> extends Trigger<Object, W> { private static final long serialVersionUID = 1L; private final long maxCount; private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); private CountTimeTrigger(long maxCount) { this.maxCount = maxCount; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { //actualizo la ventana de tiempo ctx.registerProcessingTimeTimer(window.maxTimestamp()); ReducingState<Long> count = ctx.getPartitionedState(stateDesc); count.add(1L); if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE_AND_PURGE; //porque no quiero que los vuelva a procesar y así no tengo que hacer un trigger tipo purge (nested) } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.FIRE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); ctx.getPartitionedState(stateDesc).clear(); } @Override public boolean canMerge() { return true; } @Override public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } @Override public String toString() { return "CountTimeTrigger(" + maxCount + ")"; } /** * Creates a trigger that fires once the number of elements in a pane reaches the given count. * * @param maxCount The count of elements at which to fire. * @param <W> The type of {@link Window Windows} on which this trigger can operate. */ public static <W extends Window> CountTimeTrigger<W> of(long maxCount) { return new CountTimeTrigger<>(maxCount); } private static class Sum implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } }
Y usarlo de la siguiente manera desde flink
package net.zylk.flink.window; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import net.zylk.flink.window.trigger.CountTimeTrigger; public class CountTimeWindowJob { public static void main(String[] args) throws Exception { //nc -lk 9999 comando de unix que levanta el servidor para mandar info final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStream = env.socketTextStream("localhost", 9999); //devuelve el máximo de los tres números que entren o el máximo en 10 segundos dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))).trigger(CountTimeTrigger.of(3)).max(0).print(); env.execute(); } }