Atomikos Forum

Atomkos + Camel + ActiveMQ

Wondering if anyone has run into this issue when using the combination of Atomikos + Camel + ActiveMQ. I am using this combo to peel off messages from a queue, in a transacted fashion. It seems to work well. The problem is that I am now in a situation where I need to turn on advisory messages in ActiveMQ. After I did that, I noticed that all the queues are constantly having connections recreated. This is evidenced by the flooding of the ActiveMQ.Advisory.Consumer.Queue topics. It is also apparent in the DEBUG logging, as it continually creates a connection, opens a transaction, commits it, and closes the connection. This happens without any actual application generated messages. All the other non-transacted queues/topics do not have this issue. I have read in a few other posts that connection pooling and caching can alleviate this issue. It appears that I shouldn't use caching and that I'm already connection pooling. I am using this config:

    <bean id="txq" class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="configuration" ref="txJmsConfig" />
    </bean>
    <bean id="txJmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
        <property name="connectionFactory" ref="atomikosConnectionFactory" />
        <property name="concurrentConsumers" value="1" />
        <property name="transacted" value="true" />
        <property name="maxConcurrentConsumers" value="${consumers.concurrent.max}" />
        <property name="transactionManager" ref="jtaTransactionManager" />
        <property name="cacheLevelName" value="CACHE_NONE" />
    </bean>
    <bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
        init-method="init" destroy-method="close">
        <property name="uniqueResourceName">
            <value>XA-JMS-ATOMIKOS</value>
        </property>
        <property name="localTransactionMode">
            <value>false</value>
        </property>
        <property name="poolSize">
            <value>4</value>
        </property>
        <property name="xaConnectionFactory">
            <ref bean="xaJmsConnectionFactory" />
        </property>
    </bean>
    <bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL"
            value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
    </bean>

It uses the AtomikosConnectionFactoryBean which I thought implemented pooling. Maybe I'm wrong on that? I'd love to hear if anyone else is in this boat with me...
Ryan Stanley Send private email
Monday, January 14, 2013
 
 
For transacted consumers, the JMS receive() is invoked within an open XA transaction, so there is a timeout required (default=1s) to re-start the receive() call after a while, as otherwise the transaction would time out.

You can change the timeout via "receiveTimeout" on Camel's JmsConfiguration.
Martin Lichtin Send private email
Wednesday, March 13, 2013
 
 

This topic is archived. No further replies will be accepted.

Other recent topics Other recent topics