Watchdog para Flink con reinicio automático y notificaciones por Slack y Email
Uno de los problemas más comunes a la hora de poner
en producción un proyecto basado en Flink, es el
control del estado de los jobs que
se están ejecutando.
Flink monitoriza los jobs e inicia una política de
reinicio configurable en caso de que alguno de los jobs falle. Permite especificar
políticas como la siguiente:
dos minutos, e intenta reiniciarlo. En caso de que falle de nuevo,
vuelvelo a intentar un máximo de tres veces".
Sin embargo, a veces esto no es suficiente, ya que
diferentes problemas (como por ejemplo, un problema puntual de
conectividad), pueden hacer que este reinicio fracase.
En este artículo, solucionaremos este problema. Para ello, crearemos
un demonio python que nos permitirá comprobar el
estado actual de los jobs utilizando
la API REST de flink, re-desplegar el job en caso de
que haya fallado, y enviar una notificación mediante
correo y Slack, como se ve a continuación:
API de monitorización REST de Flink
Flink ofrece tanto un interfaz
web como un API REST para gestionar la monitorización
de los jobs, que se ejecuta como parte del JobManager. Esto nos
permite consultar el estado actual de los jobs, ver que jobs han
fallado, e incluso consultar la excepción que ha generado el error.
Por defecto, el servidor web escucha en el puerto
8081, aunque se puede configurar en el fichero
flink-conf.yaml cambiando el parámetro jobmanager.web.port.
Aún así, esto nos obliga a revisar el
interfaz de manera continua para
estar seguros de que los jobs no han sufrido una caída. Este interfaz
no permite
ejecutar
acciones en caso de que alguno de los jobs haya
fallado tras aplicar la política de reinicio, como pueden ser el
re-despliegue del job o el envio de una
notificación para su revisión.
Utilizaremos la API para comprobar el estado de los Jobs que queremos
monitorizar, así como para obtener las excepciones del Job que ha
fallado. Los paths que permiten recuperar esa información mediante una
consulta GET son:
-
http://flinkhost:8081/joboverview/running: Devuelve
un listado de los jobs que están ejecutandose. -
http://flinkhost:8081/joboverview/completed:
Devuelve un listado de los jobs que han finalizado. -
http://flinkhost:8081/jobs/<jobid>/exceptions:
Devuelve las excepciones asociadas al job
especificado mediante jobid.
La API incluye muchos más métodos aparte de este. Para más
información sobre la API, se puede consultar:
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html
Script de monitorización y reinicio automático
Utilizaremos Python para crear un script que
consulte si un listado de jobs está siendo ejecutado, y en caso de que
alguno de ellos no esté haciéndolo, reiniciemos de manera automática
el job.
demonio en la misma máquina en la que se esté ejecutando flink, de
manera que el reinicio se gestione mediante un comando de shell. En la
documentación de la API de Flink, se puede consultar como realizar la
carga de un jar y su ejecución mediante la API REST, si es necesario
realizar la ejecución desde otra máquina.
Para las peticiones a la API utilizaremos la librería
requests. La libreria json nos
facilitará el procesado de la respuesta. Para la ejecución de comandos
shell, utilizaremos la librería subprocess.
Nuestro script recibirá como entrada:
- flinkHost: El dominio donde se aloja flink
- flinkPort: El puerto de la API
-
jobList: Lista separada por comas del nombre de los
jobs que se quieren monitorizar. -
launchJobCommand: Comando que se utilizará para
ejecutar el job en caso de que se haya caído. Especificaremos un
wildcard, #JOBNAME, de manera que se pueda generalizar el comando
para la ejecución de jobs con diferente nombre. -
autoRestart: Flag que permite especificar si se
desea o no el reinicio automático del job caído.
El script funcionará de la siguiente manera:
1.
Comprobar si el listado de jobs a
monitorizar está siendo ejecutando consultando los jobs activos.
2. En caso de que uno de ellos no esté ejecutandose,
se consulta la lista de jobs
finalizados, para obtener el id del job fallido, y
consultar que excepción ha lanzado.
3. Se
escribe en un fichero de log el fallo y la excepción
que lo ha provocado.
4. En caso de que el reinicio automático
este habilitado, se ejecuta el comando especificado
en launchJobCommand, sustituyendo #JOBNAME por el nombre del
job que ha fallado.
Con esto, ya tendremos tanto la monitorización como
el reinicio automático configurado. Sin embargo,
sería interesante poder enviar notificaciones en caso de fallo para
que el desarrollador sea consciente de este, y pueda investigar las
causas mediante la excepción lanzada.
Envio de notificaciones por email
Crearemos un método para el envío de un email con la
notificación de fallo del job. Para ello, utilizaremos la librería
smtplib, que nos permite conectarnos y
autenticarnos en nuestro servidor
SMTP para realizar el envío.
Añadiremos un parámetro de entrada que reciba un
JSON de configuración como el que se
muestra a continuación:
{ "from":"flinkstatus@mail.com", "to":"youremail@mail.com", "cc":"otheremail@mail.com", "subject":"FLINK ALERT!", "smtpserver":"smtp.mysmtpserver.com", "login":"my_user", "password":"worldsafestpassword" }
Envio de notificaciones por Slack
Para realizar el envío de notificaciones a Slack,
crearemos un webhook, y lo utilizaremos para enviar
las notificaciones.
Para crear el webhook, se accede a https://my.slack.com/services/new/incoming-webhook/
y se configura un canal.
Una vez creado el webhook, creamos un método que se comunique con la
API REST de slack utilizando las librerías request y
json. Además, añadimos un parámetro de entrada a
nuestro script que acepte una configuración en formato
JSON, que de momento, solo llevará incluido el
webhook de Slack, como se ve a continuación:
{ "webhookUrl":"" }
Con esto, podemos recibir en Slack mensajes
alertándonos del error sufrido en Flink.
Código
El codigo se encuentra en el siguiente github:
https://github.com/zylklab/flink-monitoring-tools/tree/master/flink_watchdog
Conclusiones
Como hemos visto, la API REST de Flink nos facilita la creación de
herramientas que permitan gestionar la monitorización,
automatización de arranque y envio de notificaciones.
Esta posibilidad, hace que la gestión de un cluster
de Flink en producción, así como la resolución automática de
problemas, se convierta en algo sencillo de manejar,
y abra un mundo de posibilidades, como pueden ser:
- El envío de estas
alertas/notificaciones a Ambari. - La gestión del escalado del
cluster, no solo midiendo el rendimiento del
hardware, sino relacionándolo con métricas de negocio. - La gestión del paralelismo de los
jobs relacionándolo con métricas
de negocio.
Pero esto, lo dejo para futuros posts
^^.