Blogs

Flink, en producción, bajo control

Watchdog para Flink con reinicio automático y notificaciones por Slack y Email

Uno de los problemas más comunes a la hora de poner en producción un proyecto basado en Flink, es el control del estado de los jobs que se están ejecutando.

Flink monitoriza los jobs e inicia una política de reinicio configurable en caso de que alguno de los jobs falle. Permite especificar políticas como la siguiente:

"En caso de que un job falle, espera dos minutos, e intenta reiniciarlo. En caso de que falle de nuevo, vuelvelo a intentar un máximo de tres veces".

Sin embargo, a veces esto no es suficiente, ya que diferentes problemas (como por ejemplo, un problema puntual de conectividad), pueden hacer que este reinicio fracase.

En este artículo, solucionaremos este problema. Para ello, crearemos un demonio python que nos permitirá comprobar el estado actual de los jobs utilizando la API REST de flink, re-desplegar el job en caso de que haya fallado, y enviar una notificación mediante correo y Slack, como se ve a continuación:

API de monitorización REST de Flink

Flink ofrece tanto un interfaz web como un API REST para gestionar la monitorización de los jobs, que se ejecuta como parte del JobManager. Esto nos permite consultar el estado actual de los jobs, ver que jobs han fallado, e incluso consultar la excepción que ha generado el error.

Por defecto, el servidor web escucha en el puerto 8081, aunque se puede configurar en el fichero flink-conf.yaml cambiando el parámetro jobmanager.web.port.

Aún así, esto nos obliga a revisar el interfaz de manera continua para estar seguros de que los jobs no han sufrido una caída. Este interfaz no permite ejecutar acciones en caso de que alguno de los jobs haya fallado tras aplicar la política de reinicio, como pueden ser el re-despliegue del job o el envio de una notificación para su revisión.

Utilizaremos la API para comprobar el estado de los Jobs que queremos monitorizar, así como para obtener las excepciones del Job que ha fallado. Los paths que permiten recuperar esa información mediante una consulta GET son:

  • http://flinkhost:8081/joboverview/running: Devuelve un listado de los jobs que están ejecutandose.
  • http://flinkhost:8081/joboverview/completed: Devuelve un listado de los jobs que han finalizado.
  • http://flinkhost:8081/jobs/<jobid>/exceptions: Devuelve las excepciones asociadas al job especificado mediante jobid.

La API incluye muchos más métodos aparte de este. Para más información sobre la API, se puede consultar:

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html

Script de monitorización y reinicio automático

Utilizaremos Python para crear un script que consulte si un listado de jobs está siendo ejecutado, y en caso de que alguno de ellos no esté haciéndolo, reiniciemos de manera automática el job.

NOTA: Para simplificar, ejecutaremos el demonio en la misma máquina en la que se esté ejecutando flink, de manera que el reinicio se gestione mediante un comando de shell. En la documentación de la API de Flink, se puede consultar como realizar la carga de un jar y su ejecución mediante la API REST, si es necesario realizar la ejecución desde otra máquina.

Para las peticiones a la API utilizaremos la librería requests. La libreria json nos facilitará el procesado de la respuesta. Para la ejecución de comandos shell, utilizaremos la librería subprocess.

Nuestro script recibirá como entrada:

  • flinkHost: El dominio donde se aloja flink
  • flinkPort: El puerto de la API
  • jobList: Lista separada por comas del nombre de los jobs que se quieren monitorizar.
  • launchJobCommand: Comando que se utilizará para ejecutar el job en caso de que se haya caído. Especificaremos un wildcard, #JOBNAME, de manera que se pueda generalizar el comando para la ejecución de jobs con diferente nombre.
  • autoRestart: Flag que permite especificar si se desea o no el reinicio automático del job caído.

El script funcionará de la siguiente manera:
1. Comprobar si el listado de jobs a monitorizar está siendo ejecutando consultando los jobs activos.
2. En caso de que uno de ellos no esté ejecutandose, se consulta la lista de jobs finalizados, para obtener el id del job fallido, y consultar que excepción ha lanzado.
3. Se escribe en un fichero de log el fallo y la excepción que lo ha provocado.
4. En caso de que el reinicio automático este habilitado, se ejecuta el comando especificado en launchJobCommand, sustituyendo #JOBNAME por el nombre del job que ha fallado.

Con esto, ya tendremos tanto la monitorización como el reinicio automático configurado. Sin embargo, sería interesante poder enviar notificaciones en caso de fallo para que el desarrollador sea consciente de este, y pueda investigar las causas mediante la excepción lanzada.

Envio de notificaciones por email

Crearemos un método para el envío de un email con la notificación de fallo del job. Para ello, utilizaremos la librería smtplib, que nos permite conectarnos y autenticarnos en nuestro servidor SMTP para realizar el envío.

Añadiremos un parámetro de entrada que reciba un JSON de configuración como el que se muestra a continuación:

{
  "from":"flinkstatus@mail.com",
  "to":"youremail@mail.com",
  "cc":"otheremail@mail.com",
  "subject":"FLINK ALERT!",
  "smtpserver":"smtp.mysmtpserver.com",
  "login":"my_user",
  "password":"worldsafestpassword"
}

Envio de notificaciones por Slack

Para realizar el envío de notificaciones a Slack, crearemos un webhook, y lo utilizaremos para enviar las notificaciones.

Para crear el webhook, se accede a https://my.slack.com/services/new/incoming-webhook/ y se configura un canal.

Una vez creado el webhook, creamos un método que se comunique con la API REST de slack utilizando las librerías request y json. Además, añadimos un parámetro de entrada a nuestro script que acepte una configuración en formato JSON, que de momento, solo llevará incluido el webhook de Slack, como se ve a continuación:

{
  "webhookUrl":""
}

Con esto, podemos recibir en Slack mensajes alertándonos del error sufrido en Flink.

Código

El codigo se encuentra en el siguiente github:

https://github.com/zylklab/flink-monitoring-tools/tree/master/flink_watchdog

Conclusiones

Como hemos visto, la API REST de Flink nos facilita la creación de herramientas que permitan gestionar la monitorización, automatización de arranque y envio de notificaciones.

Esta posibilidad, hace que la gestión de un cluster de Flink en producción, así como la resolución automática de problemas, se convierta en algo sencillo de manejar, y abra un mundo de posibilidades, como pueden ser:

  • El envío de estas alertas/notificaciones a Ambari.
  • La gestión del escalado del cluster, no solo midiendo el rendimiento del hardware, sino relacionándolo con métricas de negocio.
  • La gestión del paralelismo de los jobs relacionándolo con métricas de negocio.

Pero esto, lo dejo para futuros posts ^^.

More Blog Entries

Dockers en YARN usando HDP 3

Una de las características más interesantes de la versión 3 de hadoop es la evolución que ha...

0 Comments