Atomikos Forum |
|
I have a spring default message listener container that is configured like this:
<bean id="gatewayMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" depends-on="transactionManager"> <property name="connectionFactory" ref="queueConnectionFactoryBean"/> <property name="transactionManager" ref="transactionManager"/> <property name="destination" ref="gatewayMessageDestination"/> <property name="sessionTransacted" value="true"/> <property name="maxConcurrentConsumers" value="${das-eai.jms.listener.max.concurrent.consumers}"/> <property name="concurrentConsumers" value="${das-eai.jms.listener.min.concurrent.consumers}"/> <property name="receiveTimeout" value="${das-eai.jms.listener.receive.timeout}"/> <property name="recoveryInterval" value="${das-eai.jms.listener.recovery.interval}"/> <property name="autoStartup" value="true"/> <!-- WARNING!! This is required to get redelivery to work on message exception/rollback --> <property name="cacheLevelName" value="CACHE_CONSUMER"/> </bean> My atomikos config looks like this: <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close"> <property name="forceShutdown" value="false"/> </bean> <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.J2eeUserTransaction"> <property name="transactionTimeout" value="${das-eai.atomikos.transaction.timeout}"/> </bean> <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" depends-on="atomikosTransactionManager,atomikosUserTransaction"> <property name="transactionManager" ref="atomikosTransactionManager"/> <property name="userTransaction" ref="atomikosUserTransaction"/> <!-- This is required for Spring Batch set set a custom isolation level when it runs --> <property name="allowCustomIsolationLevels" value="true"/> </bean> And my JMS config looks like this: <bean id="queueConnectionFactoryBean" class="org.springframework.jms.connection.SingleConnectionFactory" destroy-method="destroy"> <constructor-arg> <bean class="com.atomikos.jms.QueueConnectionFactoryBean" init-method="init"> <property name="xaQueueConnectionFactory" ref="queueConnectionFactoryXa"/> </bean> </constructor-arg> <property name="reconnectOnException" value="true"/> </bean> <bean id="queueConnectionFactoryXa" class="org.apache.activemq.ActiveMQXAConnectionFactory"> <property name="brokerURL" value="${das-eai.activemq.url}"/> <!-- Set prefetch to 1 to prevent message rollback from preventing message redelivery. See http://activemq.apache.org/what-is-the-prefetch-limit-for.html for more information --> <property name="prefetchPolicy"> <bean class="org.apache.activemq.ActiveMQPrefetchPolicy"> <property name="queuePrefetch" value="${das-eai.jms.queue.prefetch}"/> </bean> </property> <!-- TODO: make these configurable in properties file --> <property name="redeliveryPolicy"> <bean class="org.apache.activemq.RedeliveryPolicy"> <property name="initialRedeliveryDelay" value="${das-eai.jms.queue.redelivery-policy.initial.redelivery.delay}"/> <property name="maximumRedeliveries" value="${das-eai.jms.queue.redelivery-policy.max.redeliveries}"/> <property name="useExponentialBackOff" value="${das.eai.jms.queue.redelivery-policy.exponential.backoff}"/> <property name="backOffMultiplier" value="${das.eai.jms.queue.redelivery-policy.backoff.multiplier}"/> </bean> </property> <!-- Spring JMS Template --> <bean id="jmsHelper" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="queueConnectionFactoryBean"/> <property name="defaultDestination" ref="gatewayMessageDestination"/> <property name="sessionTransacted" value="true"/> </bean> The problem I'm having is that inside the transaction where I am receiving the message in the DMLC, I am trying to send another message to another queue and I keep getting exceptions: com.atomikos.datasource.ResourceException: XA resource 'someUniqueName': resume for XID '6461735F6561695F7765625F746D30303035383030303031:6461735F6561695F7765625F746D3537' raised -7: the XA resource has become unavailable at com.atomikos.datasource.xa.XAResourceTransaction.resume(XAResourceTransaction.java:654) at com.atomikos.jms.DefaultJtaMessageProducer.enlist(DefaultJtaMessageProducer.java:87) at com.atomikos.jms.DefaultJtaMessageProducer.sendToDefaultDestination(DefaultJtaMessageProducer.java:216) at com.atomikos.jms.DefaultJtaMessageProducer.send(DefaultJtaMessageProducer.java:171) at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:592) at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:569) at org.springframework.jms.core.JmsTemplate$3.doInJms(JmsTemplate.java:536) at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466) at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534) at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:612) Then the next time: org.springframework.jms.IllegalStateException: The Session is closed; nested exception is javax.jms.IllegalStateException: The Session is closed at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:279) at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:168) at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:469) at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:534) at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:612) Then this: 2011-05-19 17:37:43,187 [org.springframework.jms.listener.DefaultMessageListenerContainer] [gatewayMessageListenerContainer-6] WARN [DefaultMessageListenerContainer.java:821] Setup of JMS message listener invoker failed for destination 'queue://messages' - trying to recover. Cause: XA resource 'someUniqueName': resume for XID '6461735F6561695F7765625F746D30303035393030303031:6461735F6561695F7765625F746D3538' raised -7: the XA resource has become unavailable 2011-05-19 17:37:43,188 [org.springframework.jms.connection.SingleConnectionFactory] [gatewayMessageListenerContainer-6] INFO [SingleConnectionFactory.java:291] Established shared JMS Connection: com.atomikos.jms.JtaQueueConnection@a7a93a 2011-05-19 17:37:43,192 [org.springframework.jms.listener.DefaultMessageListenerContainer] [gatewayMessageListenerContainer-6] INFO [DefaultMessageListenerContainer.java:862] Successfully refreshed JMS Connection Then this: 2011-05-19 17:37:43,339 [org.springframework.jms.connection.SingleConnectionFactory] [ActiveMQ Connection Executor: tcp://localhost/127.0.0.1:61616] WARN [SingleConnectionFactory.java:301] Encountered a JMSException - resetting the underlying JMS Connection javax.jms.JMSException: Unmatched acknowledge: MessageAck {commandId = 19, responseRequired = false, ackType = 2, consumerId = ID:chudak-laptop2-57008-1305851811992-0:3:2:1, firstMessageId = ID:chudak-laptop2-54958-1305851857690-0:0:2:1:2, lastMessageId = ID:chudak-laptop2-54958-1305851857690-0:0:2:1:5, destination = queue://messages, transactionId = XID:1096044365:6461735f6561695f7765625f746d30303036313030303031:6461735f6561695f7765625f746d3630, messageCount = 2}; Could not find Message-ID ID:chudak-laptop2-54958-1305851857690-0:0:2:1:2 in dispatched-list (start of ack) at org.apache.activemq.broker.region.PrefetchSubscription.assertAckMatchesDispatched(PrefetchSubscription.java:436) at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:207) at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:383) at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:520) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:77) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:77) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:77) at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:200) at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:87) at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:467) at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:309) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:217) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:219) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:201) at java.lang.Thread.run(Thread.java:636) Any ideas? According to what I can tell, I have everything configured the same way as the examples.
FWIW I solved this problem by creating a second queue connection factory and using that for the jms template.
I have no idea why this was necessary but figured I'd try it since I saw the two factory approach in the example here: default82df.html?community.6.668.11 |