Blogs

KMeans con Zeppelin y Spark

Aplicando KMeans en el análisis de datos

Siguiendo estos dos artículos de internet, uno donde se explica como crear un modelo aplicando el algoritmo no supervisado de clusterización (KMeans) y el otro, donde se explica usar Apache Spark desde el notebook zeppelin, he creado un ejemplo que mezcla ambos. Para poder ejecutarlo en el laboratorio de de bigdata que tenemos montado.

Lo que se ha hecho ha sido lo siguiente

  • Cargar los datos de los bancos
  • Seleccionar dos de sus características (edad, balance) y usarlas como features para el algoritmo KMeans
  • Definir el número de agupaciones que quería para el cluster, 8 en este caso
  • Definir que % de los datos se iban a usar para entrenar el modelo y que % para hacer las predicciones
  • Aplicar el modelo
  • Observar los resultados

 

El códio scala que he usado para ello es el siguiente, que se puede pegar directamente en notebook de zeppelin

Primero los imports necesarios

import org.apache.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.clustering.KMeans

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
import sqlContext._

Después la definición de mapeo de los campos del csv que contiene los datos

val schema = StructType(Array(
    StructField("age", IntegerType, true),
    StructField("job", StringType, true),
    StructField("marital", StringType, true),
    StructField("education", StringType, true),
    StructField("default", StringType, true),
    StructField("balance", DoubleType, true),
    StructField("housing", StringType, true),
    StructField("loan", StringType, true),
    StructField("contact", StringType, true),
    StructField("day", IntegerType, true),
    StructField("month", StringType, true),
    StructField("duration", IntegerType, true),
    StructField("campaign", IntegerType, true),
    StructField("pdays", IntegerType, true),
    StructField("previous", StringType, true),
    StructField("poutcome", StringType, true),
    StructField("y", StringType, true)
    ))

Una vez hecho esto, importamos el fichero csv y cargamos el csv como un dataFrame para su posterior uso. En este punto hemos tenido que añadir la dependencia de com.databricks.spark.csv como un artifact al interprete de spark de zeppelin. En caso de no hacerlo esta parte dará un error. También hemos dejado el fichero de bank-full.csv en el HDF en la carpeta /tmp del hdfs. Este fichero se puede descargar del tutorial indicado en la introducción.

val df = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("delimiter",";")
    .schema(schema).load("/tmp/bank-full.csv")

Una vez creado el dataFrame las "features" que van a ser usadas para tipificar los puntos y que KMeans pueda decidir que puntos tienen similtud con qué puntos y permita así las clasificación de los mismos. Para KMeans las features tienen que ser numéricas (ya que opera con el concepto de distancia, la agrupación la hace en base a la proximidad de los puntos), por tanto si quisieramos usar el género como parte de nuestra clasificación o el estado civil, habría que hacer una transformación de este dato para poder usarlo.

val featureCols = Array("age","balance")
val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
val df2 = assembler.transform(df)

Indicamos que % de los datos se van a usar para entrenar y que % se va a usar par hacer el test en este caso 70/30

val Array(trainingData, testData) = df2.randomSplit(Array(0.7,0.3))

Una vez preparados los datos aplicaremos el modelo, en este caso con K de 8, es decir que nos agrupe los datos en 8 agrupaciones.

val kmeans = new KMeans().setK(8).setFeaturesCol("features").setPredictionCol("prediction")
val model = kmeans.fit(df2)
val categories = model.transform(testData)

Y ya podríamos hacer queries al modelo creado

z.show(categories.groupBy("prediction").count())

O una query un poco más compleja que nos permite ver que rangos de edad incluye cada grupo del cluster

z.show(categories.select($"age",$"balance",$"prediction").groupBy("age","balance","prediction").agg(count("prediction")).orderBy("age","balance","prediction"))

El ejemplo, desde el punto de vista de  negocio no significa nada, es un mero ejercicio técnico que nos ha permitodo explorar la comprobar que se pùeden usar las técnicas de machine learning con un conjunto de datos de prueba, usando nuestro laboratorio. Es muy recomendable leer con atención los dos post indicados al comienzo, ya que explican con un grado de detalle mucho mayor que este post el uso de ML en spark. Además a diferencia de este post, el post relacionado con el caso de uso de uber si que tiene valor de negocio.

 

More Blog Entries

Simple tips about Alfresco logs

Today I put together some basic tips about Alfresco logs in Alfresco installations. 1....

My first Alfresco Hackathon at BeeCon 2017

Some customers told me that last week I was a little bit eventually consistent , and this...

0 Comments