Atomikos Forum |
|
I've written a topic listener that works fine as a non-durable topic subscriber. I want to convert it to a durable topic subscriber, but am running into problems.
This portion of the spring configuration sets up the topic listener: <bean id="xaFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> <!-- <property name="clientID" value="xaEventListenerConnection" />--> </bean> <bean id="atomikosConnectionFactoryBean" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close"> <property name="uniqueResourceName" value="ACTIVEMQ_BROKER"/> <property name="xaConnectionFactory" ref="xaFactory" /> <property name="maxPoolSize" value="1"/> </bean> <bean id="eventHandler" class="com.med.XAEventHandler"> <property name="eventRepository" ref="eventRepository"/> </bean> <!-- a class that implements javax.jms.MessageListener --> <bean id="eventListener" class="com.med.XAEventListener"> <property name="eventHandler" ref="eventHandler"/> </bean> <!-- initialize the destination --> <bean id="eventTopic" class="org.apache.activemq.command.ActiveMQTopic" > <constructor-arg value="ChangeDataEvents.Topic" /> </bean> <!-- Configure the session pool for JMS. This will also manage the transactions for incoming messages. --> <bean id="eventListenerContainer" class="com.atomikos.jms.extra.MessageDrivenContainer" init-method="start" destroy-method="stop"> <property name="atomikosConnectionFactoryBean" ref="atomikosConnectionFactoryBean" /> <property name="transactionTimeout"><value>120</value></property> <property name="destination"><ref bean="eventTopic"/></property> <property name="messageListener"><ref bean="eventListener"/></property> <property name="noLocal" value="true"/> <property name="poolSize" value="50"/> <!-- <property name="subscriberName" value="xaEventListener"/>--> </bean> If I uncomment out the clientID and subscriberName properties, it gets the following error: WARN-[2011-01-07 22:39:20,262][Thread-5][Error delegating 'start' call to JMS driver javax.jms.InvalidClientIDException: Broker: localhost - Client: xaEventListenerConnection already connected from /127.0.0.1:2299 at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:243) at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:85) at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:78) at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:85) at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:85) at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:91) at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:696) at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:83) at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:137) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:311) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:228) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:220) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:202) at java.lang.Thread.run(Unknown Source) ][-atomikos][com.atomikos.diagnostics.Slf4jConsole.println(Slf4jConsole.java:107)] WARN-[2011-01-07 22:39:20,278][Thread-5][MessageConsumerSession: Error in JMS thread How do I prevent this?
Afaik this is not possible at the moment without patching a little bit.
The problem here is that the JmsTransactionResource created does refresh its state at first time: Code does this: conn_ = factory_.createXAConnection (); XASession session = conn_.createXASession (); At this time - there is already a connection with clientID "XX" created (clientID taken from the XAFactory from ActiveMQ) and after this the MessageDriverContainer Thread want to create a connection with clientID "XX" again and this fails. The solution is to NOT set the clientID on the Factory used so that initialization works, but to set the clientID on the connection used by the MessageDriverContainer - use the clientID property on the MessageDriverContainer after applying the patch. It does work for me and imho it should be the "right" thing - the ActiveMQ factory recommends this too (setting clientID on the connection and not the factory). Made some patch for 3.7.0M5 found here to get this running: http://fachschaft.imn.htwk-leipzig.de/~tkrah/atomikos/atomikos-jms-durable.patch The only thing i am not sure about - you have to use an extra jms factory bean with pool size 1 to get it running - because those connection which has the clientID set should not be get back into the pool when used, closed or something and reused e.g. for a JmsTemplate connection which should use a random clientID. Don't know a good way yet how to force destruction instead of returning to the pool (or returning and removing) - so every container does have its own factory bean yet in my configuration.
After applying the patch to the transactions-jms source code and reconfiguring my spring beans as suggested, the transactional durable subscriber client works as expected. It shows up in the ActiveMQ Console Subscribers tab as either active or offline depending on whether or not its running. It is able to receive messages that are published when it is offline.
However, the messages it receives are not dequeued from the topic. After active non-durable subscribers receive a message, the message is dequeued by ActiveMQ. I don't know whether this difference is expected for topics having durable subscribers or a defect in either ActiveMQ or Atomikos.
I reconfigured ActiveMQ to use Oracle as the persistent store. Now I can see messages successfully published to a durable subscriber are soon removed from the backing store.
However, the ActiveMQ console shows them still in the topic queue. I'll report this defect report to ActiveMQ. |