miércoles, 24 de febrero de 2010

Implementando Transacciones distribuidas con Spring y Atomikos

Una transacción distribuida debe entenderse como aquella transacción(con sus propiedades ACID) que llega a ejecutarse a través de multiples recursos y por recurso debe entenderse una base de datos, una cola de mensajes(JMS), etc..., en distintos hosts, por ejemplo:
Como aseguro que una transacción interbancaria se realice con exito, es decir retirar dinero de un banco y depositarlo en otro, teniendo en cuenta que el gestor de bases de datos del banco origen y destino son diferentes(por ejemplo mysql y postgresql) y que están en distintos hosts.

Definamos entonces de una manera más simple la transacción distribuida de ejemplo:
begin Tx.
retirar dinero del banco origen, por ejemplo MYSQL
depositar dinero al banco destino por ejemplo POSTGRESQL.
si(no excepciones)
commit Tx.
sino
rollback Tx
Una transacción distribuida asegura que se realicen todos los pasos dentro de la transacción de forma correcta o no se realice ninguno.
Por ejemplo si es que a ocurrido una excepción al depositar el dinero en el banco destino entonces la transacción distribuida debe de realizar un rollback y deshacer la acción de retiro de dinero del banco origen.

Lo anterior se realiza mediante un mecanismo llamado commit en dos fases(two-phase commit) el cual es usado para coordinar multiples recursos durante una transacción distribuida, este mecanismo como su nombre lo indica consiste en dos fases:
Fase 1: La fase de preparación.
Fase 2: La fase de compromiso(commit).

Cuando solicitamos un commit hacia una transacción distribuida el administrador transaccional inicia el mecanismo del commit en dos fases de la siguiente manera:

En la fase uno a todos los recursos involucrados en la transacción(bases de datos, colas) se les pregunta por su estado, cada recurso puede responder con cualquiera de los 3 siguientes estados: READY, READ_ONLY y NOT_READY. Si cualquiera de los recursos responde con NOT_READY entonces la transacción entera es deshecha(se realiza un rollback). Si todos los recursos responden con READY entonces estos son comprometidos(commit) en la segunda fase. Los recursos que responden con READ_ONLY no participan de la segunda fase.

En el ejemplo que vamos a ver a continuación usamos el administrador transaccional Atomikos ya que no requiere el uso de un servidor de aplicaciones J2EE, hay que recordar que la administración transaccional(sea local o distribuida) la proporciona el servidor de aplicaciones(jboss, weblogic, websphere, etc).

El ejemplo consiste en realizar una transacción interbancaria simple:

1. Retiro dinero del banco BBVA estando sus cuentas en una base de datos mysql5.0
2. El dinero obtenido del punto 1 lo deposito al banco Scotiabank en una base de datos postgres 8.4

El código fuente es el siguiente:

1. Script de base de datos tanto para mysql como para postgres:
CREATE TABLE cuenta
(
co_cuenta char(10) NOT NULL,
va_monto decimal(10,2),
PRIMARY KEY (co_cuenta)
);

insert into cuenta values('0000000001',10000);
insert into cuenta values('0000000002',10000);
insert into cuenta values('0000000003',10000);
insert into cuenta values('0000000004',10000);
insert into cuenta values('0000000005',10000);

2. Modificar el archivo de configuración del postgresql(postgresql.conf) para habilitar las transacciones distribuidas:
max_prepared_transactions = 20 # zero disables the feature

3. Crear el bean de configuración de spring(application_xa.xml):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">
<!-- 1. Inicialización del log4j: -->
<bean id="log4jInitialization" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean">
<property name="targetClass" value="org.springframework.util.Log4jConfigurer" />
<property name="targetMethod" value="initLogging" />
<property name="arguments">
<list><value>C:\\logs\spring.config</value></list>
</property>
</bean>
<!-- 2. Declaración del dataSource para conexiones al BBVA -->
<bean id="dataSourceBBVA" class="com.atomikos.jdbc.AtomikosDataSourceBean">
<property name="uniqueResourceName" value="MYSQLDS"/>
<property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
<property name="xaProperties">
<props>
<prop key="serverName">localhost</prop>
<prop key="databaseName">bancoBBVA</prop>
<prop key="user">root</prop>
<prop key="password">jdeveloper007</prop>
</props>
</property>
<property name="minPoolSize" value="2"/>
<property name="maxPoolSize" value="10"/>
</bean>
<!-- 3. Declaración del dataSource para conexiones al Scotiabank -->
<bean id="dataSourceScotiabank" class="com.atomikos.jdbc.AtomikosDataSourceBean">
<property name="uniqueResourceName" value="POSTGRESDS"/>
<property name="xaDataSourceClassName" value="org.postgresql.xa.PGXADataSource"/>
<property name="xaProperties">
<props>
<prop key="serverName">localhost</prop>
<prop key="databaseName">bancoScotiabank</prop>
<prop key="user">postgres</prop>
<prop key="password">Jdeveloper007</prop>
</props>
</property>
<property name="minPoolSize" value="2"/>
<property name="maxPoolSize" value="10"/>
</bean>
<!-- 4. Declaración del administrador transaccional que implementa todo el trabajo
transaccional, proporcionado por 'Atomikos' : -->
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
<property name="forceShutdown" value="true"/>
<property name="startupTransactionService" value="true"/>
</bean>

<!-- 5. Declaración del user transaction de bajo nivel -->
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp"></bean>
<!-- 6. Al administrador transaccional JTA de spring le pasamos las referencias
implementadas por 'Atomikos' y no por un servidor de aplicaciones -->
<bean id="administradorTransaccional" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager"/>
<property name="userTransaction" ref="atomikosUserTransaction"/>
</bean>
<!-- 7. Habilitamos la gestión de transacciones mediante anotaciones: -->
<tx:annotation-driven transaction-manager="administradorTransaccional" />
<!-- 8. Declaración del mapeo sql para la capa de base de datos con iBatis -->
<bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean">
<property name="configLocation" value="sql-map-config.xml"/>
</bean>
<!-- 9. Definición de los daos de acceso a datos: -->
<bean id="movimientoBBVADAO" class="pe.com.slcsccy.testspring.model.dao.MovimientoBBVADAOImpl">
<property name="dataSource" ref="dataSourceBBVA"/>
<property name="sqlMapClient" ref="sqlMapClient"/>
</bean>
<bean id="movimientoScotiabankDAO" class="pe.com.slcsccy.testspring.model.dao.MovimientoScotiabankDAOImpl">
<property name="dataSource" ref="dataSourceScotiabank"/>
<property name="sqlMapClient" ref="sqlMapClient"/>
</bean>
<!-- 10. Scaneo de todos los stereotipos @Service -->
<context:component-scan base-package="pe.com.slcsccy.testspring.service"/>
</beans>

4. Creamos los daos y la configuración de ibatis:
4.1 MovimientoBBVADAO.java:
package pe.com.slcsccy.testspring.model.dao;
import pe.com.slcsccy.testspring.dominio.beans.Movimiento;
public interface MovimientoBBVADAO {
public void retirarBBVA(Movimiento cuenta);
}

4.2 MovimientoBBVADAOImpl.java:
package pe.com.slcsccy.testspring.model.dao;
import org.springframework.orm.ibatis.support.SqlMapClientDaoSupport;
import pe.com.slcsccy.testspring.dominio.beans.Movimiento;
public class MovimientoBBVADAOImpl extends SqlMapClientDaoSupport implements MovimientoBBVADAO{
@Override
public void retirarBBVA(Movimiento movimiento) {
this.getSqlMapClientTemplate().update("CUENTAS.retirarBBVA", movimiento);
}
}

4.3 MovimientoScotiabankDAO.java:
package pe.com.slcsccy.testspring.model.dao;
import pe.com.slcsccy.testspring.dominio.beans.Movimiento;
public interface MovimientoScotiabankDAO {
public void depositarScotiabank(Movimiento movimiento)throws Exception;
}

4.4 MovimientoScotiabankDAOImpl.java:
package pe.com.slcsccy.testspring.model.dao;
import java.math.BigDecimal;
import org.springframework.orm.ibatis.support.SqlMapClientDaoSupport;
import pe.com.slcsccy.testspring.dominio.beans.Movimiento;
public class MovimientoScotiabankDAOImpl extends SqlMapClientDaoSupport implements MovimientoScotiabankDAO{
private static final BigDecimal vaLimiteTransferencia = BigDecimal.valueOf(2000);
@Override
public void depositarScotiabank(Movimiento movimiento) throws Exception {
if(movimiento.getVaMonto().compareTo(vaLimiteTransferencia)>0){
throw new Exception("El monto a transferir supera el límite");
}
this.getSqlMapClientTemplate().update("CUENTAS.depositarScotiabank", movimiento);
}
}

4.5 CUENTAS_SqlMap.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE sqlMap PUBLIC "-//ibatis.apache.org//DTD SQL Map 2.0//EN" "http://ibatis.apache.org/dtd/sql-map-2.dtd" >
<sqlMap namespace="CUENTAS">
<update id="retirarBBVA" parameterClass="pe.com.slcsccy.testspring.dominio.beans.Movimiento" >
UPDATE cuenta SET va_monto = va_monto - #vaMonto#
WHERE co_cuenta = #coCuentaInterbancario#
</update>
<update id="depositarScotiabank" parameterClass="pe.com.slcsccy.testspring.dominio.beans.Movimiento" >
UPDATE cuenta SET va_monto = va_monto + #vaMonto#
WHERE co_cuenta = #coCuentaInterbancario#
</update>
</sqlMap>


5. Definimos las clases que representan al servicio:

5.1 FacadeService.java
package pe.com.slcsccy.testspring.service;
import pe.com.slcsccy.testspring.dominio.beans.Movimiento;
public interface FacadeService {
public void transferenciaInterbancaria(Movimiento movimiento) throws Exception;
}

5.2 FacadeServiceImpl.java
package pe.com.slcsccy.testspring.service;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import pe.com.slcsccy.testspring.dominio.beans.Movimiento;
import pe.com.slcsccy.testspring.model.dao.MovimientoBBVADAO;
import pe.com.slcsccy.testspring.model.dao.MovimientoScotiabankDAO;

@Service("facadeService")
@Transactional
public class FacadeServiceImpl implements FacadeService {
private final Logger logger = Logger.getLogger(getClass());
@Autowired
private MovimientoBBVADAO movimientoBBVADAO;
@Autowired
private MovimientoScotiabankDAO movimientoScotiabankDAO;
@Override
@Transactional(rollbackFor=Exception.class,readOnly=false)
public void transferenciaInterbancaria(Movimiento movimiento) throws Exception {
logger.info("INICIANDO TRANSFERENCIA...");
movimientoBBVADAO.retirarBBVA(movimiento);
movimientoScotiabankDAO.depositarScotiabank(movimiento);
logger.info("TRANSFERENCIA FINALIZADA...");
}
}

6. Por ultimo la invocación al servicio anterior
6.1 Inicio2.java:
package pe.com.slcsccy.testspring;
import java.math.BigDecimal;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import pe.com.slcsccy.testspring.dominio.beans.Movimiento;
import pe.com.slcsccy.testspring.service.FacadeService;

public class Inicio2 {

public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("application_xa.xml");
FacadeService servicioTx = (FacadeService)context.getBean("facadeService");
Movimiento movimiento = new Movimiento();
movimiento.setCoCuentaInterbancario("0000000004");
movimiento.setVaMonto(BigDecimal.valueOf(1500));
try {
servicioTx.transferenciaInterbancaria(movimiento);
} catch (Exception e) {
e.printStackTrace();
}
}
}

Si quieres que se realice un rollback al movimiento colocale un monto mayor a 2000 y verás que hace un rollback a la base del BBVA tambien.

2 comentarios:

  1. Me ayudo mucho, aunque he pensado hacer un manejador propio, no se que grado de complejidad tendrá.

    ResponderBorrar
  2. Hola, gracias por el tutorial que esta muy bueno. Me puedes ayudar con una duda, si yo a una base de datos, configuro su datasource usando XA, es necesario hacer el código mencionado en el ejemplo. Gracias.

    ResponderBorrar

Es bueno comunicarnos, comenta!!.