Atomikos Forum

ActiveMQ durable topic subscriber issue

I've written a topic listener that works fine as a non-durable topic subscriber. I want to convert it to a durable topic subscriber, but am running into problems.

This portion of the spring configuration sets up the topic listener:

    <bean id="xaFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
<!--        <property name="clientID" value="xaEventListenerConnection" />-->
    </bean>

    <bean id="atomikosConnectionFactoryBean"
        class="com.atomikos.jms.AtomikosConnectionFactoryBean"
        init-method="init" destroy-method="close">
          <property name="uniqueResourceName" value="ACTIVEMQ_BROKER"/>
          <property name="xaConnectionFactory" ref="xaFactory" />
          <property name="maxPoolSize" value="1"/>
    </bean>

    <bean id="eventHandler" class="com.med.XAEventHandler">
        <property name="eventRepository" ref="eventRepository"/>
    </bean>

    <!-- a class that implements javax.jms.MessageListener -->
    <bean id="eventListener" class="com.med.XAEventListener">
        <property name="eventHandler" ref="eventHandler"/>
    </bean>
    
    <!--  initialize the destination -->
    <bean id="eventTopic" class="org.apache.activemq.command.ActiveMQTopic" >
          <constructor-arg value="ChangeDataEvents.Topic" />
    </bean> 
   
    <!-- Configure the session pool for JMS.
    This will also manage the transactions for incoming messages. -->
    <bean id="eventListenerContainer"
        class="com.atomikos.jms.extra.MessageDrivenContainer"
        init-method="start" destroy-method="stop">
        <property name="atomikosConnectionFactoryBean" ref="atomikosConnectionFactoryBean" />
        <property name="transactionTimeout"><value>120</value></property>
        <property name="destination"><ref bean="eventTopic"/></property>
        <property name="messageListener"><ref bean="eventListener"/></property>
        <property name="noLocal" value="true"/>
        <property name="poolSize" value="50"/>
<!--        <property name="subscriberName" value="xaEventListener"/>-->
    </bean>

If I uncomment out the clientID and subscriberName properties, it gets the following error:

WARN-[2011-01-07 22:39:20,262][Thread-5][Error delegating 'start' call to JMS driver
javax.jms.InvalidClientIDException: Broker: localhost - Client: xaEventListenerConnection already connected from /127.0.0.1:2299
    at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:243)
    at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:85)
    at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:78)
    at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:85)
    at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:85)
    at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:91)
    at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:696)
    at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:83)
    at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:137)
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:311)
    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
    at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69)
    at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
    at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228)
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202)
    at java.lang.Thread.run(Unknown Source)
][-atomikos][com.atomikos.diagnostics.Slf4jConsole.println(Slf4jConsole.java:107)]
WARN-[2011-01-07 22:39:20,278][Thread-5][MessageConsumerSession: Error in JMS thread

How do I prevent this?
Jane Eisenstein Send private email
Saturday, January 08, 2011
 
 
Afaik this is not possible at the moment without patching a little bit.
The problem here is that the JmsTransactionResource created does refresh its state at first time:

Code does this:

conn_ = factory_.createXAConnection ();
XASession session = conn_.createXASession ();

At this time - there is already a connection with clientID "XX" created (clientID taken from the XAFactory from ActiveMQ) and after this the MessageDriverContainer Thread want to create a connection with clientID "XX" again and this fails.

The solution is to NOT set the clientID on the Factory used so that initialization works, but to set the clientID on the connection used by the MessageDriverContainer - use the clientID property on the MessageDriverContainer after applying the patch.
It does work for me and imho it should be the "right" thing - the ActiveMQ factory recommends this too (setting clientID on the connection and not the factory).

Made some patch for 3.7.0M5 found here to get this running:

http://fachschaft.imn.htwk-leipzig.de/~tkrah/atomikos/atomikos-jms-durable.patch

The only thing i am not sure about - you have to use an extra jms factory bean with pool size 1 to get it running - because those connection which has the clientID set should not be get back into the pool when used, closed or something and reused e.g. for a JmsTemplate connection which should use a random clientID.
Don't know a good way yet how to force destruction instead of returning to the pool (or returning and removing) - so every container does have its own factory bean yet in my configuration.
Torsten Krah Send private email
Saturday, January 08, 2011
 
 
I'll give this a try.

Will this change get into an official Atomikos Transaction Essentials release?
Jane Eisenstein Send private email
Monday, January 10, 2011
 
 
We heve created a case for this.
Guy Pardon Send private email
Monday, January 10, 2011
 
 
After applying the patch to the transactions-jms source code and reconfiguring my spring beans as suggested, the transactional durable subscriber client works as expected. It shows up in the ActiveMQ Console Subscribers tab as either active or offline depending on whether or not its running. It is able to receive messages that are published when it is offline.

However, the messages it receives are not dequeued from the topic. After active non-durable subscribers receive a message, the message is dequeued by ActiveMQ. I don't know whether this difference is expected for topics having durable subscribers or a defect in either ActiveMQ or Atomikos.
Jane Eisenstein Send private email
Thursday, January 20, 2011
 
 
I've found bouncing the ActiveMQ server clears messages consumed by the durable subscriber from the topic. There are probably easier ways to accomplish this via ActiveMQ's configuration.
Jane Eisenstein Send private email
Friday, January 21, 2011
 
 
The last problem (messages not removed) seems like an ActiveMQ but IMHO. When JTA/XA, received messages MUST be removed from the queue/topic upon commit...
Guy Pardon Send private email
Monday, January 24, 2011
 
 
I reconfigured ActiveMQ to use Oracle as the persistent store. Now I can see messages successfully published to a durable subscriber are soon removed from the backing store.

However, the ActiveMQ console shows them still in the topic queue. I'll report this defect report to ActiveMQ.
Jane Eisenstein Send private email
Tuesday, January 25, 2011
 
 
Hi Jane - fine that it works for you now beside this activemq bug.
Can you please post the upstream bug report URI to track about this here for anyone else who got this problem - thx?
Torsten Krah Send private email
Tuesday, February 22, 2011
 
 

This topic is archived. No further replies will be accepted.

Other recent topics Other recent topics