大佬教程收集整理的这篇文章主要介绍了使用@TransactionalEventListener 在 JPA 事务中发送消息时,消息未提交(丢失),大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
代码背景:
为了复制一个生产场景,我创建了一个虚拟应用程序,它基本上会在一个事务中在数据库中保存一些东西,并以同样的方法,它publishEvent和publishEvent向rabbitMQ发送消息。
类和用法
事务从此方法开始。:
@OverrIDe
@Transactional
public EmpDTO createEmployeeInTrans(EmpDTO empDto) {
return createEmployee(empDto);
}
此方法将记录保存在数据库中并触发发布事件
@OverrIDe
public EmpDTO createEmployee(EmpDTO empDTO) {
EmpEntity empEntity = new EmpEntity();
BeanUtils.copyPropertIEs(empDTO,empEntity);
System.out.println("<< In Transaction : "+TransactionSynchronizationManager.getCurrentTransactionname()+" >> Saving data for employee " + empDTO.getEmpCode());
// Record data into a database
empEntity = empRepository.save(empEntity);
// Sending event,this will send the message.
eventPublisher.publishEvent(new ActivityEvent(empDTO));
return createResponse(empDTO,empEntity);
}
这是活动事件
import org.springframework.context.ApplicationEvent;
import com.kuldeep.rabbitMQProducer.dto.EmpDTO;
public class ActivityEvent extends ApplicationEvent {
public ActivityEvent(EmpDTO source) {
super(source);
}
}
这是上述事件的 TransactionalEventListener。
//@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public voID onActivitySave(ActivityEvent activityEvent) {
System.out.println("Activity got event ... Sending message .. ");
kRabbitTemplate.convertAndSend(exchange,routingkey,empDTO);
}
这是 kRabbitTemplate 是这样的 bean 配置:
@Bean
public RabbitTemplate kRabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate kRabbitTemplate = new RabbitTemplate(connectionFactory);
kRabbitTemplate.setChannelTransacted(true);
kRabbitTemplate.setMessageConverter(kJsonMessageConverter());
return kRabbitTemplate;
}
问题定义
当我使用上述代码流在rabbitMQ 上保存记录并发送消息时,我的消息未在服务器上传递意味着它们丢失了。
我对 AMQP 中的事务的理解是:
// this is a snippet from org.springframework.amqp.rabbit.core.RabbitTemplate.doSend()
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
我在调查上述代码中消息丢失的原因后发现的内容。
public static RabbitResourceHolder bindResourcetoTransaction(RabbitResourceHolder resourceHolder,ConnectionFactory connectionFactory,boolean synched) {
if (TransactionSynchronizationManager.hasResource(connectionFactory)
|| !TransactionSynchronizationManager.isActualTransactionActive() || !synched) {
return (RabbitResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); // NOSONAR never null
}
TransactionSynchronizationManager.bindResource(connectionFactory,resourceHolder);
resourceHolder.setSynchronizeDWithTransaction(true);
if (TransactionSynchronizationManager.isSynchronizationActive()) {
TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder,connectionFactory));
}
return resourceHolder;
}
在我的代码中,资源绑定后,无法注册同步,因为TransactionSynchronizationManager.isSynchronizationActive()==false
。并且由于它未能注册同步,rabbitMQ 消息没有发生 spring 提交,因为 AbstractPlatformTransactionManager.triggerAfterCompletion
为每次同步调用 RabbitMQ 的提交。
由于上述问题,我遇到了什么问题。
TransactionSynchronizationManager.isSynchronizationActive()==false
的可能根本原因
status.isNewSynchronization()
在 DB 操作之后评估 true
(如果我在没有 ApplicationEvent 的情况下调用 convertAndSend 通常不会发生这种情况)。 private voID triggerAfterCompletion(DefaultTransactionStatus status,int completionStatus) {
if (status.isNewSynchronization()) {
List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
TransactionSynchronizationManager.clearSynchronization();
if (!status.hasTransaction() || status.isNewTransaction()) {
if (status.isDeBUG()) {
logger.trace("Triggering afterCompletion synchronization");
}
// No transaction or new transaction for the current scope ->
// invoke the afterCompletion callbacks immediately
invokeAfterCompletion(synchronizations,completionStatus);
}
else if (!synchronizations.isEmpty()) {
// Existing transaction that we participate in,controlled outsIDe
// of the scope of this Spring transaction manager -> try to register
// an afterCompletion callback with the existing (JTA) transaction.
registerafterCompletionWithExistingTransaction(status.getTransaction(),synchronizations);
}
}
}
我在这个问题上做了什么来克服
我只是在 onActivitySave 方法中添加了 @Transactional(propagation = Propagation.REQUIRES_NEW)
和 on @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
,它在新事务开始时起作用。
我需要知道的
status.isNewSynchronization
?TransactionSynchronizationManager.isActualTransactionActive()==true
?请帮我解决这个问题,这是一个热门的生产问题,我不太确定我所做的修复。
这是一个错误; RabbitMQ 事务代码比 @TransactionalEventListener
代码早很多年。
问题是,通过这种配置,我们处于准事务状态,而确实有一个事务在进行中,同步已经被清除,因为事务已经提交。
使用 @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
有效。
我看到你已经提出了一个问题:
https://github.com/spring-projects/spring-amqp/issues/1309
以后最好在这里提出问题,如果您觉得有错误,请提出问题。不要两者都做。
以上是大佬教程为你收集整理的使用@TransactionalEventListener 在 JPA 事务中发送消息时,消息未提交(丢失)全部内容,希望文章能够帮你解决使用@TransactionalEventListener 在 JPA 事务中发送消息时,消息未提交(丢失)所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。