Uno de los problemas más comunes a la hora de poner en producción un proyecto basado en Flink, es el control del estado de los jobs que se están ejecutando.
Flink monitoriza los jobs e inicia una política de reinicio configurable en caso de que alguno de los jobs falle. Permite especificar políticas como la siguiente:
Sin embargo, a veces esto no es suficiente, ya que diferentes problemas (como por ejemplo, un problema puntual de conectividad), pueden hacer que este reinicio fracase.
En este artículo, solucionaremos este problema. Para ello, crearemos un demonio python que nos permitirá comprobar el estado actual de los jobs utilizando la API REST de flink, re-desplegar el job en caso de que haya fallado, y enviar una notificación mediante correo y Slack, como se ve a continuación:
Flink ofrece tanto un interfaz web como un API REST para gestionar la monitorización de los jobs, que se ejecuta como parte del JobManager. Esto nos permite consultar el estado actual de los jobs, ver que jobs han fallado, e incluso consultar la excepción que ha generado el error.
Por defecto, el servidor web escucha en el puerto 8081, aunque se puede configurar en el fichero flink-conf.yaml cambiando el parámetro jobmanager.web.port.
Aún así, esto nos obliga a revisar el interfaz de manera continua para estar seguros de que los jobs no han sufrido una caída. Este interfaz no permite ejecutar acciones en caso de que alguno de los jobs haya fallado tras aplicar la política de reinicio, como pueden ser el re-despliegue del job o el envio de una notificación para su revisión.
Utilizaremos la API para comprobar el estado de los Jobs que queremos monitorizar, así como para obtener las excepciones del Job que ha fallado. Los paths que permiten recuperar esa información mediante una consulta GET son:
La API incluye muchos más métodos aparte de este. Para más información sobre la API, se puede consultar:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
Utilizaremos Python para crear un script que consulte si un listado de jobs está siendo ejecutado, y en caso de que alguno de ellos no esté haciéndolo, reiniciemos de manera automática el job.
Para las peticiones a la API utilizaremos la librería requests. La libreria json nos facilitará el procesado de la respuesta. Para la ejecución de comandos shell, utilizaremos la librería subprocess.
Nuestro script recibirá como entrada:
El script funcionará de la siguiente manera: 1. Comprobar si el listado de jobs a monitorizar está siendo ejecutando consultando los jobs activos. 2. En caso de que uno de ellos no esté ejecutandose, se consulta la lista de jobs finalizados, para obtener el id del job fallido, y consultar que excepción ha lanzado. 3. Se escribe en un fichero de log el fallo y la excepción que lo ha provocado. 4. En caso de que el reinicio automático este habilitado, se ejecuta el comando especificado en launchJobCommand, sustituyendo #JOBNAME por el nombre del job que ha fallado.
Con esto, ya tendremos tanto la monitorización como el reinicio automático configurado. Sin embargo, sería interesante poder enviar notificaciones en caso de fallo para que el desarrollador sea consciente de este, y pueda investigar las causas mediante la excepción lanzada.
Crearemos un método para el envío de un email con la notificación de fallo del job. Para ello, utilizaremos la librería smtplib, que nos permite conectarnos y autenticarnos en nuestro servidor SMTP para realizar el envío.
Añadiremos un parámetro de entrada que reciba un JSON de configuración como el que se muestra a continuación:
{ "from":"flinkstatus@mail.com", "to":"youremail@mail.com", "cc":"otheremail@mail.com", "subject":"FLINK ALERT!", "smtpserver":"smtp.mysmtpserver.com", "login":"my_user", "password":"worldsafestpassword" }
Para realizar el envío de notificaciones a Slack, crearemos un webhook, y lo utilizaremos para enviar las notificaciones.
Para crear el webhook, se accede a https://my.slack.com/services/new/incoming-webhook/ y se configura un canal.
Una vez creado el webhook, creamos un método que se comunique con la API REST de slack utilizando las librerías request y json. Además, añadimos un parámetro de entrada a nuestro script que acepte una configuración en formato JSON, que de momento, solo llevará incluido el webhook de Slack, como se ve a continuación:
{ "webhookUrl":"" }
Con esto, podemos recibir en Slack mensajes alertándonos del error sufrido en Flink.
El codigo se encuentra en el siguiente github:
https://github.com/zylklab/flink-monitoring-tools/tree/master/flink_watchdog
Como hemos visto, la API REST de Flink nos facilita la creación de herramientas que permitan gestionar la monitorización, automatización de arranque y envio de notificaciones.
Esta posibilidad, hace que la gestión de un cluster de Flink en producción, así como la resolución automática de problemas, se convierta en algo sencillo de manejar, y abra un mundo de posibilidades, como pueden ser:
Pero esto, lo dejo para futuros posts ^^.