Atomikos Forum |
|
A minimal test program tries to receive a message via JMS from MQ 7.5. Despite the queue is filled, reading a message times out. If the timeout in messageConsumer.receive(timeout) is less than ~10s, the TX completes successfully (but without a received message), otherwise it fails with the stack trace below.
The test program is able to send messages successfully (i.e. connection to MQ works in general), and a non-XA variant of the consumer works (i.e. the code is not completely wrong). The MQXAQueueConnectionFactory is constructed programatically (no JNDI). Setup: - Java 7.0.25 - WebSphere MQ 7.5.0.2 (client and server) - OS: RHEL 6.4 (64bit) or Win 32bit, same behaviour - transactions-osgi-3.9.2.jar Exception in thread "main" javax.transaction.RollbackException: Prepare: NO vote at com.atomikos.icatch.jta.TransactionImp.rethrowAsJtaRollbackException(TransactionImp.java:66) at com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:206) at com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:436) at com.atomikos.icatch.jta.UserTransactionManager.commit(UserTransactionManager.java:177) at AtomikosMinimalTest.produceOrConsumeMessage(AtomikosMinimalTest.java:83) at AtomikosMinimalTest.main(AtomikosMinimalTest.java:108) Caused by: com.atomikos.icatch.RollbackException: Prepare: NO vote at com.atomikos.icatch.imp.ActiveStateHandler.prepare(ActiveStateHandler.java:225) at com.atomikos.icatch.imp.CoordinatorImp.prepare(CoordinatorImp.java:681) at com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:970) at com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:82) at com.atomikos.icatch.imp.CompositeTransactionImp.commit(CompositeTransactionImp.java:336) at com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:190) ... 4 more Source Code: import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.XAConnectionFactory; import com.atomikos.icatch.jta.UserTransactionManager; import com.atomikos.jms.AtomikosConnectionFactoryBean; import com.ibm.mq.jms.MQXAQueueConnectionFactory; import com.ibm.msg.client.wmq.WMQConstants; public class AtomikosMinimalTest { static final int DEFAULT_POOL_SIZE = 5; static final long DEFAULT_RECEIVE_TIMEOUT = 13000; static AtomikosConnectionFactoryBean cfb = null; static UserTransactionManager utm = null; static XAConnectionFactory getXAConnectionFactory(String hostname, int port, String queueManager, String channel) { MQXAQueueConnectionFactory mqConnectionFactory = new MQXAQueueConnectionFactory(); try { mqConnectionFactory.setHostName(hostname); mqConnectionFactory.setPort(port); mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT); mqConnectionFactory.setQueueManager(queueManager); mqConnectionFactory.setChannel(channel); } catch (JMSException e) { System.err.println("failed to set up MQ connection factory"); e.printStackTrace(); return null; } return mqConnectionFactory; } static void initAtomikosConnectionFactoryBean(String hostname, int port, String queueManager, String channel) { cfb = new AtomikosConnectionFactoryBean(); String uniqueResourceName = hostname.length() > 28 ? hostname.substring(0,28) : hostname; uniqueResourceName += ":" + port + "/" + queueManager; uniqueResourceName = uniqueResourceName.toLowerCase(); cfb.setUniqueResourceName(uniqueResourceName); cfb.setXaConnectionFactory(getXAConnectionFactory(hostname, port, queueManager, channel)); cfb.setPoolSize(DEFAULT_POOL_SIZE); } static void produceMessage(Session session, Destination destination) throws JMSException { MessageProducer messageProducer = session.createProducer(destination); TextMessage message = session.createTextMessage("XXX"); messageProducer.send(message); } static void consumeMessage(Session session, Destination destination) throws JMSException { MessageConsumer messageConsumer = session.createConsumer(destination); TextMessage message = (TextMessage)messageConsumer.receive(DEFAULT_RECEIVE_TIMEOUT); if (message == null) { System.out.println("timeout"); } else { System.out.println(message.getText()); } } static void produceOrConsumeMessage(String queue, String mode) throws Exception { utm = new UserTransactionManager(); utm.init(); utm.begin(); Connection conn = cfb.createConnection(); Session session = conn.createSession(true,0); Destination destination = session.createQueue(queue); if ("p".equals(mode)) { produceMessage(session, destination); } else { consumeMessage(session, destination); } session.close(); conn.close(); utm.commit(); cfb.close(); utm.close(); } static void usage() { System.out.println("usage: AMT p|c (p=producer, c=consumer)"); System.exit(1); } public static void main(String[] args) throws Exception { if (args.length != 1) { usage(); } String mode = args[0]; if (!("p".equals(mode) || "c".equals(mode))) { usage(); } String hostname = "10.192.84.20"; int port = 1531; String queueManager = "INPFRATIC1"; String channel = "P1.FI.ODS.INPFRATIC1"; String queue = "P0.UB.ODS.EXPORTODS.02"; initAtomikosConnectionFactoryBean(hostname, port, queueManager, channel); produceOrConsumeMessage(queue,mode); } } |