【图灵学院】高并发分布式消息中间件技术ActiveMQ事务

发布时间:2021-04-23 13:38:25

ActiveMQ支持两种事务

JMS transactions – the commit() / rollback() methods on a Session (which is like doing commit() / rollback() on a JDBC connection)

XA Transactions – where the XASession acts as an XAResource by communicating with the Message Broker, rather like a JDBC Connection takes place in an XA transaction by communicating with the database.

在支持事务的session中,producer发送message时在message中带有transaction ID。broker收到message后判断是否有transaction ID,如果有就把message保存在transaction store中,等待commit或者rollback消息。所以ActiveMq的事务是针对broker而不是producer的,不管session是否commit,broker都会收到message。

如果producer发送模式选择了persistent,那么message过期后会进入死亡队列。在message进入死亡队列之前,ActiveMQ会删除message中的transaction ID,这样过期的message就不在事务中了,不会保存在transaction store中,会直接进入死亡队列。具体删除transaction ID的地方是在org.apache.activemq.util.BrokerSupport的doResend,将transaction ID保存在了originalTransactionID中,删除了transaction ID

事务

jms中事务分为生产者和消费者两块,消息的生产和消费不能包含在同一个事务中。

生产者:

在事务状态下进行发送操作,消息并未真正投递到中间件。而只有进行session.commit操作之后,消息才会发送到中间件,再转发到适当的消费者进行处理。如果是调用rollback操作,则表明,当前事务期间内所发送的消息都取消掉。

在支持事务的session中,producer发送message时在message中带有transactionID。broker收到message后判断是否有transactionID,如果有就把message保存在transaction store中,等待commit或者rollback消息。所以ActiveMq的事务是针对broker而不是producer的,不管session是否commit,broker都会收到message。如果producer发送模式选择了persistent,那么message过期后会进入死亡队列。在message进入死亡队列之前,ActiveMQ会删除message中的transaction ID,这样过期的message就不在事务中了,不会保存在transaction store中,会直接进入死亡队列。

消费者:

在Spring整合JMS的应用中,我们要进行本地的事务管理,只需要指定对应的监听容器的sessionTransacted属性为true。对于SessionAwareMessageListener,在接收到消息后发送一个返回消息时也处于同一事务下,但是对于其他操作如数据库访问等将不属于该事务控制。

<bean id=”jmsContainer”

class=”org.springframework.jms.listener.DefaultMessageListenerContainer”>

jta事务:

如果想要接收消息和数据库访问处于同一事务中,那么我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如DefaultMessageListenerContainer)。要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个JtaTransactionManager,当然底层的JMS ConnectionFactory需要能够支持分布式事务管理,并正确地注册我们的JtaTransactionManager。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。

当指定了transactionManager时,消息监听容器将忽略sessionTransacted的值。

持久化

默认持久化到文件中:

打开安装目录下的配置文件,注意这里使用的是kahaDB,是一个基于文件支持事务的消息存储器。以日志形式存储消息,消息索引以B-Tree结构存储。在D:ActiveMQapache-activemqconfactivemq.xml中会发现默认的配置项:

消息存储在基于文件的数据日志中。如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。

消息文件的位置索引存储在内存中,这样能快速定位到。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。

如需持久化到数据库中:

首先需要把MySql的驱动放到ActiveMQ的Lib目录下,例如:mysql-connector-java-5.0.4-bin.jar,在conf/acticvemq.xml中更改persistenceAdapter节点配置并且引用定义的mysql-ds数据源。

dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候创建数据表,默认值是true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为true,之后改成false。

在conf/acticvemq.xml中定义mysql-ds,如下。

然后重新启动消息队列,你会发现多了3张表activemq_acks,activemq_lock,activemq_msgs。

activemq_msgs用于存储消息,然后启动消费者,发现Mysql中已经没有这条消息了。

activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。

activemq_lock在集群环境中才有用。

PERSISTENT (持久消息)和 NON_PERSISTENT(非持久消息),默认为持久消息。持久化的消息在MQ服务器宕机之后,消息不会丢失,在重启服务的时候,消息将恢复。

prefetch:

ActiveMQ的prefetch机制。当消费者去获取消息时,不会一条一条去获取,而是一次性获取一批,默认是1000条。

假设有三个消费者,接收从1到99,共99条消息:

consumer A:1,4,7…

consumer B:2,5,8…

consumer C:3,6,9…

按照默认分配策略,将会把消息如上预分配。

这些预获取的消息,在还没确认消费之前,在管理控制台还是可以看见这些消息的,但是不会再分配给其他消费者,此时这些消息的状态应该算作“已分配未消费”,如果消息最后被消费,则会在服务器端被删除。如果消费者崩溃,则这些消息会被重新分配给新的消费者。但是如果消费者既不消费确认,又不崩溃,那这些消息就永远躺在消费者的缓存区里无法处理。

假设一种情况:某个consumer C性能较差,处理信息速度很慢。会导致consumer C任有消息积压,但consumer A, consumer B已经空闲。

解决方案:将consumer C 的 prefetch设为1,每次处理1条消息,处理完再去取。

prefetchPolicy.setQueuePrefetch(1);

connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);//实例化连接工厂

connectionFactory.setPrefetchPolicy(prefetchPolicy);

TimeToLive:

表示一个消息的有效期。只有在这个有效期内,消息消费者才可以消费这个消息。默认值为0,表示消息永不过期。

如果使用TTL来判定消息的过期,那么就首先需要确保Producer、broker两者的系统时间要尽可能的一致,Consumer也尽可能的和broker的时间保持一致。Broker将会在接收Producer消息时,以及将消息发送给Consumer之前都会检测消息是否过期,判断过期的方法也就是根据JMSExpiration和当前时间戳比较。

上一篇 怎么选择更好的java培训机构
下一篇 返回列表