Atomikos Forum

no rollback when using Spring, ActiveMQ, JOOQ, and Postgres

Database writes are not rolling back as I expected.
I've spent many hours reading software documentation and web postings.
I have not been able to resolve the issue.
I'm hoping you folks can help me.

Scenario
. My application pulls a message from a queue, extracts data from the message, and writes it to a database.
. The method that writes to the database does 2 SQL inserts.
. The second insert gets an exception:
  org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "table2_PK"
. However, the first insert is still getting committed to the database.

Relevant Software
. spring-boot 1.2.5.RELEASE
. atomikos-util 3.9.3 (from spring-boot-starter-jta-atomikos 1.2.5.RELEASE)
. activemq-client 5.1.2
. jooq 3.6.2
. postgresql 9.4-1201-jdbc41

Application Code
I've pasted the relevant parts of my code below.
 1. GdmServer - my "server" class, which also declares Spring bean configurations
 2. PortSIQueue - my JMS MessageListener class
 3. Kernel - my worker class, a Spring bean invoked by my MessageListener, i.e. code that writes to database

I'd appreciate any help anyone can offer.
Thanks

------------------------------------------------------------
package com.sm.gis.gdm;

import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;

@SpringBootApplication
@EnableJms
@EnableTransactionManagement
public class GdmServer {
   
    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    GisConfig                      gisConfig;
   
    /**
    * Starts the GDM Server
    */
    public static void main(String[] args) {
        SpringApplication.run(GdmServer.class, args);
    }
   
    // -------------------------------------------------------
    // Spring bean configurations
    // -------------------------------------------------------
   
    @Bean
    GisConfig gisConfig() {
        return new GisConfig();
    }
   
    @Bean
    PlatformTransactionManager transactionManager() throws SystemException {
        JtaTransactionManager manager = new JtaTransactionManager();
        manager.setTransactionManager( atomikosUserTransactionManager() );
        manager.setUserTransaction  ( atomikosUserTransaction() );
        manager.setAllowCustomIsolationLevels(true);
        return manager;
    }
   
    @Bean(initMethod = "init", destroyMethod = "close")
    UserTransactionManager atomikosUserTransactionManager() throws SystemException {
        UserTransactionManager manager = new UserTransactionManager();
        manager.setStartupTransactionService(true);
        manager.setForceShutdown(false);
        manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
        return manager;
    }
   
    @Bean
    UserTransaction atomikosUserTransaction() {
        return new UserTransactionImp();
    }
   
    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosDataSourceBean atomikosJdbcConnectionFactory() {
        PGXADataSource pgXADataSource = new PGXADataSource();
        pgXADataSource.setUrl( gisConfig.getGdbUrl() );
        pgXADataSource.setUser( gisConfig.getGdbUser() );
        pgXADataSource.setPassword( gisConfig.getGdbPassword() );
   
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(pgXADataSource);
        xaDataSource.setUniqueResourceName("gdb");
        xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
        return xaDataSource;
    }
   
    @Bean
    DSLContext dslContext() {
        DSLContext dslContext = DSL.using(atomikosJdbcConnectionFactory(), SQLDialect.POSTGRES);
        return dslContext;
    }

    @Bean(initMethod = "init", destroyMethod = "close")
    AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
        ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
        activeMQXAConnectionFactory.setBrokerURL( gisConfig.getMomBrokerUrl() );
       
        AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
        atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
        atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
        atomikosConnectionFactoryBean.setLocalTransactionMode(false);
        return atomikosConnectionFactoryBean;
    }
   
    @Bean
    DefaultMessageListenerContainer queueWrapperGDM() throws SystemException {
        DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer();
        messageSource.setTransactionManager( transactionManager() );
        messageSource.setConnectionFactory( atomikosJmsConnectionFactory() );
        messageSource.setSessionTransacted(true);
        messageSource.setConcurrentConsumers(1);
        messageSource.setReceiveTimeout( gisConfig.getMomQueueGdmTimeoutReceive() );
        messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
        messageSource.setMessageListener( context.getBean("portSIQueue") );
        return messageSource;
    }
   
    @Bean
    JmsTemplate queueWrapperLIMS() {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
        jmsTemplate.setDefaultDestinationName( gisConfig.getMomQueueLimsName() );
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }
 
}

------------------------------------------------------------
package com.sm.gis.gdm.ports;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;

@Component
public class PortSIQueue implements MessageListener {
   
    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    GisMessageMarshaler            queueMessageMashaler;
    @Autowired
    Kernel                          kernel;
   
    @Override
    @Transactional(rollbackFor = {Throwable.class})
    public void onMessage(Message jmsMessage) {
       
        TextMessage jmsTextMessage = (TextMessage) jmsMessage;
       
        // Extract JMS message body...
        String jmsPayload = "";
        try {
            jmsPayload = jmsTextMessage.getText();
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
       
        // Marshal XML text to object...
        Object gisMessage = queueMessageMashaler.toObject(jmsPayload);
       
        kernel.receiveCreateGenomicTestOrderInGIS( (CreateGenomicTestOrderInGIS) gisMessage );
    }

}

------------------------------------------------------------
package com.sm.gis.gdm.kernel;

import org.jooq.DSLContext;
import org.jooq.impl.DSL;


@Component
public class Kernel {

    @Autowired
    ConfigurableApplicationContext  context;
    @Autowired
    DSLContext                      dslContext;

<snip>
    public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {

            dslContext.insertInto(table1)
                .set(...)
                .set(...)
            .execute();

            dslContext.insertInto(table2)
                .set(...)
                .set(...)
            .execute();
    }
<snip>
}

------------------------------------------------------------
Jim D'Augustine Send private email
Friday, September 25, 2015
 
 
The resolution to this issue can be found at...
https://groups.google.com/forum/#!topic/jooq-user/geYotcwEcRM
Jim D'Augustine Send private email
Wednesday, September 30, 2015
 
 

This topic is archived. No further replies will be accepted.

Other recent topics Other recent topics