version en video.
actualmente estoy haciendo un curso de spark en scala y me pareció buena excusa para inventarme algunos ejercicios usando RDD con spark.
RDD es una sigla para Resilient Distributed Dataset, pero lo que uno pone al utilizarlo es el dataset(o la data), lo resiliente y lo distribuido es de lo que se encarga Spark.
La idea que tuve fue buscar un dataset en kaggle y hacer algunas preguntas que fuera a resolver con spark el dataset que primero encontre fue https://www.kaggle.com/datasets/isabbaggin/transaction-fraudulent-financial-syntheticdata/ este es un dataset sintético (no es data real) de operaciones financieras y fraudulentas.
El dataset tiene las siguientes columnas
- Transaction id
- Customer id
- Merchant id
- Amount
- Transaction Time
- Is fraudulent
- card type
- location
- purchase category
- customer age
no es un dataset muy grande, y aunque spark esta pensado para hacer procesamiento de grandes cantidades de datos, solo queria intentar cosas entonces me parecio bien.
las preguntas que me quise hacer sobre el dataset ordenadas por el orden de dificultad que creo tienen (aunque ninguna es nada muy complejo) fueron.
- total de valores de transacciones por cada cliente
- valor promedio de las operaciones fraudulentas
- promedio valor de operaciones por cada cliente
- cuantas operaciones fraudulentas tienen en total los grupos de edades de 20 en 20, y ordenarlos
Total de valores de transacciones por cada cliente
para este pregunta los 2 campos que me interesan son el customer id y el amount, la idea seria tomar estos dos campos y para todas las entradas que tengan el mismo customer id sumar el amount/cantidad.
El codigo con el que se consiguio esto es:
import org.apache.log4j._
import org.apache.spark._
object TotalTransacionesCliente {
def main(arg: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "Totaltransacionescliente")
val input = sc.textFile("data/archive/synthetic_financial_data.csv")
val skipable_first_row = input.first()
val useful_csv_rows = input.filter(row => row != skipable_first_row)
//https://itecnote.com/tecnote/scala-remove-first-row-of-spark-dataframe/
val clientWithAmountSpent = useful_csv_rows.map(x => {
val fields = x.split(",")
(fields.apply(1), fields.apply(3).toFloat)
})
val result = clientWithAmountSpent.reduceByKey((a, b) => a + b)
result.collect.map(println)
}
}
lo primero que se ve en el archivo son los import hacemos import de spark y de la libreria del logger.
empezamos seteando el nivel del logger a error para que no salga tanta informacion en la consola
luego inicializamos un contexto de spark en este caso "local[*]" indicando que corre en el propio computado y un nombre.
con la siguiente linea leemos el archivo que mencionamos anteriormente, luego de leerlo le quitamos la primera fila al archivo con las 2 lineas siguientes, esto es porque la primera linea es de encabezados y no nos interesan para el calculo.
en todos las preguntas que quiero responder hasta este punto va a ser igual (solo cambiara el nombre del a aplicacion en el constructo de SparkContext) pero eso no afecta en nada
entonces ya tenemos en un RDD (usesul_csv_rows) nuestro archivo, con el primer map que hacemos dejamos solo el id del cliente en el primer campo y la cantida de la transacion en el segundo.
dentro del RDD el primer campo le llaman key y al segundo campo de la tuple llaman value entonces algunas operaciones se hace teniendo en cuenta esto como en la siguiente linea que reducimos por la key es decir tomamos todos los registros que tengan la misma key(en este caso que sean el mismo cliente) y sumamos el value(en este caso sumamos la cantida gastada en esa transacion) dejando una sola fila por cliente que contiene su id y la cantida gastada
ya con esto llamamos la "action" collect, recorremos el resultado con el map y imprimimos. los action cumplen una funcion especial en spark pues este es lazy execution (ejecucion perezoza) y no se empieza a ejecuta sino hasta que se llama un action sobre el RDD que es cuando hacedemos a la data. el action en este caso collect hace que spark genera su plan de ejecucion para las operaciones que definimos y se ejecute.
la salida de esta ejecucion es algo asi
(1017,524947.9)
(1040,462102.34)
(1046,411321.8)
(1042,534393.9)
(1057,491750.62)
(1039,578364.25)
(1064,502542.7)
(1073,479930.88)
(1088,533418.8)
para las siguiente me voy a saltar la parte inicial(de inicializar el contexto de spark, cargar el archivo y eliminar las cabeceras) y la final ( hacer collect del resultado y imprimirlo), pero el codigo completo se puede encontrar aca https://github.com/chalimbu/rdds-blog
Valor promedio de las operaciones fraudulentas
val isFraudulentAndValue=useful_csv_rows.map(x=>{
val fields= x.split(",")
(fields.apply(5).toInt==1,fields.apply(3).toFloat)
})
val onlyFraudulent=isFraudulentAndValue.filter(x=>x._1==true)
val averageFraudulent=onlyFraudulent.mapValues(x=>(1,x)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>x._2/x._1)
lo primero que hago en este caso es sacar los valores con un map si es una operacion fraudulenta (que esta marcado en el dataset original como un 0 o 1, asi que pongo que cuando es 1 sea true es decir fraudulenta), y luego el campo 3 (en realidad el 4 por que empieza en 0) que contiene la cantida de la transacion.
ya con estos campos realizo un filtro para quedarme solamente con las operaciones que sean fraudulentas.
posteriormente saco el promedio para esto empiezo agregando 1 a cada registro(que posteriormente usare para contar los registros y poder sacar el promedio), con lo que me queda (true,(1,valor de la operacion)), reduzco por la key para sumar tanto los 1 que son el numero de registros, como la cantida que es valor de la operacion(ya sabemos que solo hay un key igual a verdadero por que filtramos antes las operaciones fraudulentas, finalmente divido el campo 2 de los values que contiene la suma de las operaciones fraudulentas con el total de operaciones.
Promedio valor de operaciones por cada cliente
val clientIdAndValue = useful_csv_rows.map(x => {
val fields = x.split(",")
(fields.apply(1), fields.apply(3).toFloat)
})
val promedioOperacionPorClient=clientIdAndValue.mapValues(x=>(1,x)).reduceByKey((op1,op2)=>(op1._1+op2._1,op1._2+op2._2)).mapValues(x=>x._2/x._1)
este es muy similar al anterior solo que en este caso aplico el reduceByKey final en un dataset donde esta como key todos los cliente id con lo que logro sacar el promedio por clientes y no para todo el dataset.
y en este caso no aplico el filtro para operaciones fraudulentas por lo que tengo en cuenta todas las transaciones para el promedio.
Conteo operaciones fraudulentas por grupo de edades ordenado.
val ageFraudulent = useful_csv_rows.map(x => {
val fields = x.split(",")
(fields.apply(9).toInt, fields.apply(5).toInt==1)
})
val sumFraudulentByAge=ageFraudulent.filter(x=>x._2).mapValues(x=>1).reduceByKey((op1,op2)=>(op1+op2))
val generatingGroups=sumFraudulentByAge.map(x=>(x._2,x._1)).mapValues(x=>{
x match {
case x if x<20 => GruposEdadesMenorA.Veinte
case x if x<40 => GruposEdadesMenorA.Cuarenta
case x if x<60 => GruposEdadesMenorA.Sesenta
case x if x<80 => GruposEdadesMenorA.Ochenta
case x if x<100 => GruposEdadesMenorA.Cien
case _ => GruposEdadesMenorA.Mas
}
}).map(x=>(x._2,x._1)).reduceByKey((x,y)=>(x+y)).map(x=>(x._2,x._1)).sortByKey()
este es en el que mas complejida hay y posiblemente se puede hacer de manera mas sencilla y sino mas clara pero el razonamiento es este
del datase original en el primer map saco la edad(convertida a entero) y si la transacion fue fraudulenta, posterior a esto con el filter me quedo con las operaciones que fueron fraudulentas.
debido a que ya no necesito el value de operaciones fraudulentas simplemente lo convierto con el mapa a 1 con lo que me queda un rdd, donde tengo la edad como key y el numero 1 como value, procedo entonces a reducer por key para terminar con cuantas operaciones fraudulentas tiene cada edad
paso entonces a rotar los key y los values, para dejar en el key el numero de operaciones fraudulentas para esa edad, y en el value la edad. con esto puedo entonces trasformar la edad a los rangos que defini usando mapValue, para quedarme entonces con un rdd que tiene (numero operaciones fraudulentas para una edad, a que rango de edad corresponde). teniendo esto luego del mapa con el match grande, roto los key y los values nuevamente para hacer un reduce por key donde el key va ser los grupo de edad y quiero sumar todas las operaciones fraudulentas en cada categoria.
finalmente hago un mapa donde roto los valores para poder ordenar por key(teniendo en la key el conteo de operaciones fraudulentas)
si se quiere correr este codigo, posiblemente inventarse mas preguntas, resolvarlas con un poco de codigo tengo entendido que spark necesita por lo menos 8 gb para correr, se puede clonar el repo https://github.com/chalimbu/rdds-blog y correr para el caso de intellij solo me toco colocar que la version de java con la que corria era java 11, por que por defecto me estaba tomando la 18
- https://www.kaggle.com/datasets/isabbaggin/transaction-fraudulent-financial-syntheticdata/ . el dataset usado para probar
- https://docs.scala-lang.org/tour/pattern-matching.html el match del ultimo ejercisio
- https://www.opinionatedgeek.com/codecs/htmlencoder para que el codigo se formatee
- https://itecnote.com/tecnote/scala-remove-first-row-of-spark-dataframe/ donde encontre como remover los encabezados
- https://www.udemy.com/course/apache-spark-with-scala-hands-on-with-big-data/ el curso de spark mencionado