Atomikos Forum |
|
Database writes are not rolling back as I expected.
I've spent many hours reading software documentation and web postings. I have not been able to resolve the issue. I'm hoping you folks can help me. Scenario . My application pulls a message from a queue, extracts data from the message, and writes it to a database. . The method that writes to the database does 2 SQL inserts. . The second insert gets an exception: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "table2_PK" . However, the first insert is still getting committed to the database. Relevant Software . spring-boot 1.2.5.RELEASE . atomikos-util 3.9.3 (from spring-boot-starter-jta-atomikos 1.2.5.RELEASE) . activemq-client 5.1.2 . jooq 3.6.2 . postgresql 9.4-1201-jdbc41 Application Code I've pasted the relevant parts of my code below. 1. GdmServer - my "server" class, which also declares Spring bean configurations 2. PortSIQueue - my JMS MessageListener class 3. Kernel - my worker class, a Spring bean invoked by my MessageListener, i.e. code that writes to database I'd appreciate any help anyone can offer. Thanks ------------------------------------------------------------ package com.sm.gis.gdm; import javax.transaction.SystemException; import javax.transaction.UserTransaction; import org.apache.activemq.ActiveMQXAConnectionFactory; import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.jooq.impl.DSL; import org.postgresql.xa.PGXADataSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.listener.DefaultMessageListenerContainer; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.jta.JtaTransactionManager; import com.atomikos.icatch.jta.UserTransactionImp; import com.atomikos.icatch.jta.UserTransactionManager; import com.atomikos.jms.AtomikosConnectionFactoryBean; import com.sm.gis.config.GisConfig; @SpringBootApplication @EnableJms @EnableTransactionManagement public class GdmServer { @Autowired ConfigurableApplicationContext context; @Autowired GisConfig gisConfig; /** * Starts the GDM Server */ public static void main(String[] args) { SpringApplication.run(GdmServer.class, args); } // ------------------------------------------------------- // Spring bean configurations // ------------------------------------------------------- @Bean GisConfig gisConfig() { return new GisConfig(); } @Bean PlatformTransactionManager transactionManager() throws SystemException { JtaTransactionManager manager = new JtaTransactionManager(); manager.setTransactionManager( atomikosUserTransactionManager() ); manager.setUserTransaction ( atomikosUserTransaction() ); manager.setAllowCustomIsolationLevels(true); return manager; } @Bean(initMethod = "init", destroyMethod = "close") UserTransactionManager atomikosUserTransactionManager() throws SystemException { UserTransactionManager manager = new UserTransactionManager(); manager.setStartupTransactionService(true); manager.setForceShutdown(false); manager.setTransactionTimeout( gisConfig.getTxnTimeout() ); return manager; } @Bean UserTransaction atomikosUserTransaction() { return new UserTransactionImp(); } @Bean(initMethod = "init", destroyMethod = "close") AtomikosDataSourceBean atomikosJdbcConnectionFactory() { PGXADataSource pgXADataSource = new PGXADataSource(); pgXADataSource.setUrl( gisConfig.getGdbUrl() ); pgXADataSource.setUser( gisConfig.getGdbUser() ); pgXADataSource.setPassword( gisConfig.getGdbPassword() ); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(pgXADataSource); xaDataSource.setUniqueResourceName("gdb"); xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() ); return xaDataSource; } @Bean DSLContext dslContext() { DSLContext dslContext = DSL.using(atomikosJdbcConnectionFactory(), SQLDialect.POSTGRES); return dslContext; } @Bean(initMethod = "init", destroyMethod = "close") AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() { ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(); activeMQXAConnectionFactory.setBrokerURL( gisConfig.getMomBrokerUrl() ); AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean(); atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker"); atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory); atomikosConnectionFactoryBean.setLocalTransactionMode(false); return atomikosConnectionFactoryBean; } @Bean DefaultMessageListenerContainer queueWrapperGDM() throws SystemException { DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer(); messageSource.setTransactionManager( transactionManager() ); messageSource.setConnectionFactory( atomikosJmsConnectionFactory() ); messageSource.setSessionTransacted(true); messageSource.setConcurrentConsumers(1); messageSource.setReceiveTimeout( gisConfig.getMomQueueGdmTimeoutReceive() ); messageSource.setDestinationName( gisConfig.getMomQueueGdmName() ); messageSource.setMessageListener( context.getBean("portSIQueue") ); return messageSource; } @Bean JmsTemplate queueWrapperLIMS() { JmsTemplate jmsTemplate = new JmsTemplate(); jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() ); jmsTemplate.setDefaultDestinationName( gisConfig.getMomQueueLimsName() ); jmsTemplate.setSessionTransacted(true); return jmsTemplate; } } ------------------------------------------------------------ package com.sm.gis.gdm.ports; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import com.sm.gis.gdm.kernel.Kernel; import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler; import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS; @Component public class PortSIQueue implements MessageListener { @Autowired ConfigurableApplicationContext context; @Autowired GisMessageMarshaler queueMessageMashaler; @Autowired Kernel kernel; @Override @Transactional(rollbackFor = {Throwable.class}) public void onMessage(Message jmsMessage) { TextMessage jmsTextMessage = (TextMessage) jmsMessage; // Extract JMS message body... String jmsPayload = ""; try { jmsPayload = jmsTextMessage.getText(); } catch (JMSException e) { throw new RuntimeException(e); } // Marshal XML text to object... Object gisMessage = queueMessageMashaler.toObject(jmsPayload); kernel.receiveCreateGenomicTestOrderInGIS( (CreateGenomicTestOrderInGIS) gisMessage ); } } ------------------------------------------------------------ package com.sm.gis.gdm.kernel; import org.jooq.DSLContext; import org.jooq.impl.DSL; @Component public class Kernel { @Autowired ConfigurableApplicationContext context; @Autowired DSLContext dslContext; <snip> public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) { dslContext.insertInto(table1) .set(...) .set(...) .execute(); dslContext.insertInto(table2) .set(...) .set(...) .execute(); } <snip> } ------------------------------------------------------------
The resolution to this issue can be found at...
https://groups.google.com/forum/#!topic/jooq-user/geYotcwEcRM |