Atomikos Forum

MQ 7.5: receiving a message times out

A minimal test program tries to receive a message via JMS from MQ 7.5. Despite the queue is filled, reading a message times out. If the timeout in messageConsumer.receive(timeout) is less than ~10s, the TX completes successfully (but without a received message), otherwise it fails with the stack trace below.

The test program is able to send messages successfully (i.e. connection to MQ works in general), and a non-XA variant of the consumer works (i.e. the code is not completely wrong).

The MQXAQueueConnectionFactory is constructed programatically (no JNDI).

Setup:
 - Java 7.0.25
 - WebSphere MQ 7.5.0.2 (client and server)
 - OS: RHEL 6.4 (64bit) or Win 32bit, same behaviour
 - transactions-osgi-3.9.2.jar

Exception in thread "main" javax.transaction.RollbackException: Prepare: NO vote
        at com.atomikos.icatch.jta.TransactionImp.rethrowAsJtaRollbackException(TransactionImp.java:66)
        at com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:206)
        at com.atomikos.icatch.jta.TransactionManagerImp.commit(TransactionManagerImp.java:436)
        at com.atomikos.icatch.jta.UserTransactionManager.commit(UserTransactionManager.java:177)
        at AtomikosMinimalTest.produceOrConsumeMessage(AtomikosMinimalTest.java:83)
        at AtomikosMinimalTest.main(AtomikosMinimalTest.java:108)
Caused by: com.atomikos.icatch.RollbackException: Prepare: NO vote
        at com.atomikos.icatch.imp.ActiveStateHandler.prepare(ActiveStateHandler.java:225)
        at com.atomikos.icatch.imp.CoordinatorImp.prepare(CoordinatorImp.java:681)
        at com.atomikos.icatch.imp.CoordinatorImp.terminate(CoordinatorImp.java:970)
        at com.atomikos.icatch.imp.CompositeTerminatorImp.commit(CompositeTerminatorImp.java:82)
        at com.atomikos.icatch.imp.CompositeTransactionImp.commit(CompositeTransactionImp.java:336)
        at com.atomikos.icatch.jta.TransactionImp.commit(TransactionImp.java:190)
        ... 4 more


Source Code:


import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAConnectionFactory;

import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.ibm.mq.jms.MQXAQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;


public class AtomikosMinimalTest {

    static final int DEFAULT_POOL_SIZE = 5;
    static final long DEFAULT_RECEIVE_TIMEOUT = 13000;
    
    static AtomikosConnectionFactoryBean cfb = null;
    static UserTransactionManager utm = null;
    
    static XAConnectionFactory getXAConnectionFactory(String hostname, int port, String queueManager, String channel) {
        MQXAQueueConnectionFactory mqConnectionFactory = new MQXAQueueConnectionFactory();

        try {
            mqConnectionFactory.setHostName(hostname);
            mqConnectionFactory.setPort(port);
            mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            mqConnectionFactory.setQueueManager(queueManager);
            mqConnectionFactory.setChannel(channel);
        } catch (JMSException e) {
            System.err.println("failed to set up MQ connection factory");
            e.printStackTrace();
            return null;
        }
           
        return mqConnectionFactory;
    }

    static void initAtomikosConnectionFactoryBean(String hostname, int port, String queueManager, String channel) {
        cfb = new AtomikosConnectionFactoryBean();
        String uniqueResourceName = hostname.length() > 28 ? hostname.substring(0,28) : hostname;
        uniqueResourceName += ":" + port + "/" + queueManager;
        uniqueResourceName = uniqueResourceName.toLowerCase();
        cfb.setUniqueResourceName(uniqueResourceName);
        cfb.setXaConnectionFactory(getXAConnectionFactory(hostname, port, queueManager, channel));
        cfb.setPoolSize(DEFAULT_POOL_SIZE);
    }

    static void produceMessage(Session session, Destination destination) throws JMSException {
        MessageProducer messageProducer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("XXX");
        messageProducer.send(message);
    }

    static void consumeMessage(Session session, Destination destination) throws JMSException {
        MessageConsumer messageConsumer = session.createConsumer(destination);
        TextMessage message = (TextMessage)messageConsumer.receive(DEFAULT_RECEIVE_TIMEOUT);
        if (message == null) {
            System.out.println("timeout");
        } else {
            System.out.println(message.getText());
        }
    }

    static void produceOrConsumeMessage(String queue, String mode) throws Exception {
        utm = new UserTransactionManager();
        utm.init();
        utm.begin();
        Connection conn = cfb.createConnection();
        Session session = conn.createSession(true,0);
        Destination destination = session.createQueue(queue);
        if ("p".equals(mode)) {
            produceMessage(session, destination);
        } else {
            consumeMessage(session, destination);
        }
        session.close();
        conn.close();
        utm.commit();
        cfb.close();
        utm.close();
       
    }

    static void usage() {
        System.out.println("usage: AMT p|c (p=producer, c=consumer)");
        System.exit(1);
    }
   
    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            usage();
        }
        String mode = args[0];
        if (!("p".equals(mode) || "c".equals(mode))) {
            usage();
        }
        String hostname = "10.192.84.20";
        int port = 1531;
        String queueManager = "INPFRATIC1";
        String channel = "P1.FI.ODS.INPFRATIC1";
        String queue = "P0.UB.ODS.EXPORTODS.02";
        initAtomikosConnectionFactoryBean(hostname, port, queueManager, channel);
        produceOrConsumeMessage(queue,mode);
    }
}
Alexander Frink Send private email
Monday, March 17, 2014
 
 
I found the problem here. A conn.start() was missing. Oddly this seems to be not needed for non-XA message consuming.
Alexander Frink Send private email
Thursday, March 20, 2014
 
 

This topic is archived. No further replies will be accepted.

Other recent topics Other recent topics