jueves, 2 de noviembre de 2023

RDD en spark con scala

version en video.


 

 

actualmente estoy haciendo un curso de spark en scala y me pareció buena excusa para inventarme algunos ejercicios usando RDD con spark.

RDD es una sigla para Resilient Distributed Dataset, pero lo que uno pone al utilizarlo es el dataset(o la data), lo resiliente y lo distribuido es de lo que se encarga Spark.

La idea que tuve fue buscar un dataset en kaggle y hacer algunas preguntas que fuera a resolver con spark el dataset que primero encontre fue https://www.kaggle.com/datasets/isabbaggin/transaction-fraudulent-financial-syntheticdata/ este es un dataset sintético (no es data real) de operaciones financieras y fraudulentas.

El dataset tiene las siguientes columnas

  • Transaction id
  • Customer id
  • Merchant id
  • Amount
  • Transaction Time
  • Is fraudulent
  • card type
  • location
  • purchase category
  • customer age

no es un dataset muy grande, y aunque spark esta pensado para hacer procesamiento de grandes cantidades de datos, solo queria intentar cosas entonces me parecio bien.

las preguntas que me quise hacer sobre el dataset ordenadas por el orden de dificultad que creo tienen (aunque ninguna es nada muy complejo) fueron.

  • total de valores de transacciones por cada cliente
  • valor promedio de las operaciones fraudulentas
  • promedio valor de operaciones por cada cliente
  • cuantas operaciones fraudulentas tienen en total los grupos de edades de 20 en 20, y ordenarlos


Total de valores de transacciones por cada cliente

para este pregunta los 2 campos que me interesan son el customer id y el amount, la idea seria tomar estos dos campos y para todas las entradas que tengan el mismo customer id sumar el amount/cantidad.

El codigo con el que se consiguio esto es:



import org.apache.log4j._
import org.apache.spark._

object TotalTransacionesCliente {

  def main(arg: Array[String]) {
    Logger.getLogger("org").setLevel(Level.ERROR)

    val sc = new SparkContext("local[*]", "Totaltransacionescliente")

    val input = sc.textFile("data/archive/synthetic_financial_data.csv")
    val skipable_first_row = input.first()
    val useful_csv_rows = input.filter(row => row != skipable_first_row)
    //https://itecnote.com/tecnote/scala-remove-first-row-of-spark-dataframe/
    val clientWithAmountSpent = useful_csv_rows.map(x => {
      val fields = x.split(",")
      (fields.apply(1), fields.apply(3).toFloat)
    })

    val result = clientWithAmountSpent.reduceByKey((a, b) => a + b)

    result.collect.map(println)

  }

}

lo primero que se ve en el archivo son los import hacemos import de spark y de la libreria del logger.

empezamos seteando el nivel del logger a error para que no salga tanta informacion en la consola

luego inicializamos un contexto de spark en este caso "local[*]" indicando que corre en el propio computado y un nombre.

con la siguiente linea leemos el archivo que mencionamos anteriormente, luego de leerlo le quitamos la primera fila al archivo con las 2 lineas siguientes, esto es porque la primera linea es de encabezados y no nos interesan para el calculo.

en todos las preguntas que quiero responder hasta este punto va a ser igual (solo cambiara el nombre del a aplicacion en el constructo de SparkContext) pero eso no afecta en nada

entonces ya tenemos en un RDD (usesul_csv_rows) nuestro archivo, con el primer map que hacemos dejamos solo el id del cliente en el primer campo y la cantida de la transacion en el segundo.

dentro del RDD el primer campo le llaman key y al segundo campo de la tuple llaman value entonces algunas operaciones se hace teniendo en cuenta esto como en la siguiente linea que reducimos por la key es decir tomamos todos los registros que tengan la misma key(en este caso que sean el mismo cliente) y sumamos el value(en este caso sumamos la cantida gastada en esa transacion) dejando una sola fila por cliente que contiene su id y la cantida gastada

ya con esto llamamos la "action" collect, recorremos el resultado con el map y imprimimos. los action cumplen una funcion especial en spark pues este es lazy execution (ejecucion perezoza) y no se empieza a ejecuta sino hasta que se llama un action sobre el RDD que es cuando hacedemos a la data. el action en este caso collect hace que spark genera su plan de ejecucion para las operaciones que definimos y se ejecute.

la salida de esta ejecucion es algo asi


(1017,524947.9)
(1040,462102.34)
(1046,411321.8)
(1042,534393.9)
(1057,491750.62)
(1039,578364.25)
(1064,502542.7)
(1073,479930.88)
(1088,533418.8)

para las siguiente me voy a saltar la parte inicial(de inicializar el contexto de spark, cargar el archivo y eliminar las cabeceras) y la final ( hacer collect del resultado y imprimirlo), pero el codigo completo se puede encontrar aca https://github.com/chalimbu/rdds-blog

Valor promedio de las operaciones fraudulentas


val isFraudulentAndValue=useful_csv_rows.map(x=>{
  val fields= x.split(",")
  (fields.apply(5).toInt==1,fields.apply(3).toFloat)
})
val onlyFraudulent=isFraudulentAndValue.filter(x=>x._1==true)
val averageFraudulent=onlyFraudulent.mapValues(x=>(1,x)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>x._2/x._1)

lo primero que hago en este caso es sacar los valores con un map si es una operacion fraudulenta (que esta marcado en el dataset original como un 0 o 1, asi que pongo que cuando es 1 sea true es decir fraudulenta), y luego el campo 3 (en realidad el 4 por que empieza en 0) que contiene la cantida de la transacion.

ya con estos campos realizo un filtro para quedarme solamente con las operaciones que sean fraudulentas.

posteriormente saco el promedio para esto empiezo agregando 1 a cada registro(que posteriormente usare para contar los registros y poder sacar el promedio), con lo que me queda (true,(1,valor de la operacion)), reduzco por la key para sumar tanto los 1 que son el numero de registros, como la cantida que es valor de la operacion(ya sabemos que solo hay un key igual a verdadero por que filtramos antes las operaciones fraudulentas, finalmente divido el campo 2 de los values que contiene la suma de las operaciones fraudulentas con el total de operaciones.

Promedio valor de operaciones por cada cliente


val clientIdAndValue = useful_csv_rows.map(x => {
  val fields = x.split(",")
  (fields.apply(1), fields.apply(3).toFloat)
})
val promedioOperacionPorClient=clientIdAndValue.mapValues(x=>(1,x)).reduceByKey((op1,op2)=>(op1._1+op2._1,op1._2+op2._2)).mapValues(x=>x._2/x._1)

este es muy similar al anterior solo que en este caso aplico el reduceByKey final en un dataset donde esta como key todos los cliente id con lo que logro sacar el promedio por clientes y no para todo el dataset.

y en este caso no aplico el filtro para operaciones fraudulentas por lo que tengo en cuenta todas las transaciones para el promedio.

Conteo operaciones fraudulentas por grupo de edades ordenado.


val ageFraudulent = useful_csv_rows.map(x => {
  val fields = x.split(",")
  (fields.apply(9).toInt, fields.apply(5).toInt==1)
})
val sumFraudulentByAge=ageFraudulent.filter(x=>x._2).mapValues(x=>1).reduceByKey((op1,op2)=>(op1+op2))
val generatingGroups=sumFraudulentByAge.map(x=>(x._2,x._1)).mapValues(x=>{
  x match {
    case x if x<20 => GruposEdadesMenorA.Veinte
    case x if x<40 => GruposEdadesMenorA.Cuarenta
    case x if x<60 => GruposEdadesMenorA.Sesenta
    case x if x<80 => GruposEdadesMenorA.Ochenta
    case x if x<100 => GruposEdadesMenorA.Cien
    case _ => GruposEdadesMenorA.Mas
  }
}).map(x=>(x._2,x._1)).reduceByKey((x,y)=>(x+y)).map(x=>(x._2,x._1)).sortByKey()

este es en el que mas complejida hay y posiblemente se puede hacer de manera mas sencilla y sino mas clara pero el razonamiento es este

del datase original en el primer map saco la edad(convertida a entero) y si la transacion fue fraudulenta, posterior a esto con el filter me quedo con las operaciones que fueron fraudulentas.

debido a que ya no necesito el value de operaciones fraudulentas simplemente lo convierto con el mapa a 1 con lo que me queda un rdd, donde tengo la edad como key y el numero 1 como value, procedo entonces a reducer por key para terminar con cuantas operaciones fraudulentas tiene cada edad

paso entonces a rotar los key y los values, para dejar en el key el numero de operaciones fraudulentas para esa edad, y en el value la edad. con esto puedo entonces trasformar la edad a los rangos que defini usando mapValue, para quedarme entonces con un rdd que tiene (numero operaciones fraudulentas para una edad, a que rango de edad corresponde). teniendo esto luego del mapa con el match grande, roto los key y los values nuevamente para hacer un reduce por key donde el key va ser los grupo de edad y quiero sumar todas las operaciones fraudulentas en cada categoria.

finalmente hago un mapa donde roto los valores para poder ordenar por key(teniendo en la key el conteo de operaciones fraudulentas)

si se quiere correr este codigo, posiblemente inventarse mas preguntas, resolvarlas con un poco de codigo tengo entendido que spark necesita por lo menos 8 gb para correr, se puede clonar el repo https://github.com/chalimbu/rdds-blog y correr para el caso de intellij solo me toco colocar que la version de java con la que corria era java 11, por que por defecto me estaba tomando la 18

referencias.
  • https://www.kaggle.com/datasets/isabbaggin/transaction-fraudulent-financial-syntheticdata/ . el dataset usado para probar
  • https://docs.scala-lang.org/tour/pattern-matching.html el match del ultimo ejercisio
  •  https://www.opinionatedgeek.com/codecs/htmlencoder para que el codigo se formatee
  • https://itecnote.com/tecnote/scala-remove-first-row-of-spark-dataframe/ donde encontre como remover los encabezados
  • https://www.udemy.com/course/apache-spark-with-scala-hands-on-with-big-data/ el curso de spark mencionado

jueves, 31 de agosto de 2023

Sagas en microservicios.


 

Información sacada del libro Microservice patterns with examples in java, de Chris Richardson(2019)
Hacer microservicios no es facil una de las dificultades al hacerlos es como generar operaciones que requieren la interacción de múltiples servicios.

Para conseguir esto se usa una estrategia llamada sagas, estas permiten conectar las funcionalidades de varios servicios para realizar una operación

Mediante sagas los servicios se comunican para lograr una operacion conjunta y como todo existen muchas maneras de hacerlo para no entrar en todas las variantes voy a escribir del enfoque recomendado en el libro para la mayoría de casos(aunque en el libro se explican sus variantes ventajas y desventajas).

Debido a que múltiples servicios van a interactuar en la operación lo recomendado inicia con un retorno temprano, esto significa responder al cliente con un valor(un id generalmente) aunque la operación(en el caso de comandos) no esté completa, luego el cliente (por ejemplo una pagina web, o app móvil) puede consultar para verificar si su operación ya se aplicó y su información actualizada (mandando el id que recibió de manera temprana el servicio/s que reciben la petición pueden confirmar si la operación que hizo el cliente ya se aplicó)

 



Se recomienda hacer esto (aunque se convierta en una lógica mas completa para el cliente) por que si no se hace la disponibilidad del sistema disminuye al tener que estar arriba/disponibles al mismo tiempo todos los servicios involucrados en una operación para que la operación no falle(recordando que la disponibilidad es la multiplicación de la disponibilidad de los servicios involucrados)
Generalmente en una arquitectura de microservicio este servicio A va ser un api gateway (o backend for frontend), teniendo esto claro entramos a las sagas que las hay de 2 tipos.



  • sagas coreografiadas: donde los servicios se comunican directamente entre sí, sin una aplicación o sistema central que los coordine, solo se recomienda si una operación muy sencilla pues se vuelve muy complejo de mantener muy rápido (esto se da a que la lógica para una sola operación, sus estados queda distribuida entre los servicios mismo, por que lo para entender una operación hay que entrar a ver cada servicio)
  • sagas orquestadas: una aplicación se encarga de mantener una máquina de estado de la operación y con ella conectarse con los servicios correspondientes y servir de punto central de comunicación entre servicios, tener una máquina de estado para cada operación simplifica el entendimiento de como funciona una operación y sus interacciones con otros servicios

Debido a esto las sagas orquestadas son mas recomendadas en general (solo podrian estar bien las coreografiadas cuando se estén haciendo operaciones sencillas), en el libro recomiendan que las sagas se conecten mediante una estrategia request-response asíncrona esta estrategia consiste en usar un sistema de mensajería (como rabbit o kafka) pero simulando el modelo de petición respuesta.


al usar un broker de mensajería con el retorno temprano de ante, se logra mejorar la disponibilidad (si el servicio que recibe el mensaje esta abajo el broker mantiene el mensaje hasta que el servicio vuelva a subir, y como se respondio rapidamente al cliente la petición no se queda esperando ni falla) y al ser un modelo request response la comunicación es más sencilla( se usa un id de mensaje como en la imagen que permite que el servicio que hace la petición vincula la respuesta con la misma petición con hizo al recibir el mismo id, también se le llama aveces requestId).

Entonces el orquestador de sagas (que puede ser un servicio o correr como parte de uno de los servicios) con su interacciones luce algo así

 



todavía faltan varias cosas de sagas en microservicios que se mencionan, pero ya se alargo bastante por lo que seguirá en una parte 2, las cosas que faltan que recuerdo son.

  • tipos de operaciones en sagas(compesatable, retriable, pivot)
  • compensation transaction.
  • isolation y semantic locking