大佬教程收集整理的这篇文章主要介绍了如何在 Spring 测试中监视自动装配的 bean,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我有一个简单的日志处理程序 bean 配置,我将其注入到 IntegrationFlow
@Configuration
class LogHandlerConfiguration {
private LoggingHandler handler;
@Bean
public messageHandler kafkasuccessHandler() {
return getLogger(LoggingHandler.Level.INFO);
}
@Bean(name="kafkaFailureHandler")
public messageHandler kafkaFailureHandler() {
return getLogger(LoggingHandler.Level.ERROR);
}
private LoggingHandler getLogger(LoggingHandler.Level level) {
handler = new LoggingHandler(level);
handler.setShouldLogFullmessage(Boolean.TRUE);
return handler;
}
}
要测试的集成流程
@Bean
IntegrationFlow kafkaFailureFlow(ExecutorChAnnel kafkaErrorChAnnel,messageHandler kafkaFailureHandler) {
return IntegrationFlows.from(kafkaErrorChAnnel)
.transform("payload.Failedmessage")
.handle(kafkaFailureHandler)
.get();
}
这是我的测试
@SpyBean
messageHandler kafkaFailureHandler;
@BeforeEach
public voID setup() {
mockitoAnnotations.openmocks(KafkaPublishFailureTest.class);
}
@Test
voID testFailedKafkaPublish() {
//Dummy message
Map<String,String> map = new HashMap<>();
map.put("key","value");
// Publish message
message<Map<String,String>> message = messageBuilder.withPayload(map)
.setheader("X-UPSTREAM-TYPE","alm")
.setheader("X-UPSTREAM-INSTANCE","jira")
.setheader("X-messaGE-KEY","key-1")
.build();
kafkaGateway.publish(messagE);
// Failure handler called
mockito.verify(kafkaFailureHandler,mockito.timeout(0).atleastOnce()).handlemessage(
ArgumentMatchers.any(message.class));
}
我们创建了一个通用的 Kafka Producer、Consumer 配置,下游应用可以将最适合其需求的失败和成功处理程序附加到该配置上。我无法验证在这种情况下 LoggingHandler
至少被调用了一次。
failureHandler
在 ExecturoeChAnnel
支持的 ThreadPoolTaskExecutor
下执行
@Bean
ExecutorChAnnel kafkaErrorChAnnel(Executor threadPoolExecutor) {
return messageChAnnels.executor("kafkaErrorChAnnel",threadPoolExecutor).get();
}
通过重试建议处理失败
@Bean
requestHandlerRetryAdvice retryAdvice(ExecutorChAnnel kafkaErrorChAnnel) {
requestHandlerRetryAdvice retryAdvice = new requestHandlerRetryAdvice();
retryAdvice.setRecoveryCallBACk(new ErrormessageSendingRecoverer(kafkaErrorChAnnel));
return retryAdvice;
}
运行测试时出现此错误
java.lang.IllegalStateException: No bean found for deFinition [SpyDeFinition@44dfdd58 name = '',typetoSpy = org.springframework.messaging.messageHandler,reset = AFTER]
at org.springframework.util.Assert.state(Assert.java:97) ~[spring-core-5.3.4.jar:5.3.4]
at org.springframework.boot.test.mock.mockito.mockitopostProcessor.inject(mockitopostProcessor.java:351) ~[spring-boot-test-2.4.3.jar:2.4.3]
这是我尝试过的并且有效的方法:
@SpringBootApplication
public class Demo1Application {
public static void main(String[] args) {
SpringApplication.run(Demo1Application.class,args);
}
@Bean
ExecutorChAnnel kafkaErrorChAnnel(TaskExecutor taskExecutor) {
return new ExecutorChAnnel(taskExecutor);
}
@Bean
public messageHandler kafkaFailureHandler() {
LoggingHandler handler = new LoggingHandler(LoggingHandler.Level.ERROR);
handler.setShouldLogFullmessage(Boolean.TRUE);
return handler;
}
@Bean
IntegrationFlow kafkaFailureFlow(ExecutorChAnnel kafkaErrorChAnnel,messageHandler kafkaFailureHandler) {
return IntegrationFlows.from(kafkaErrorChAnnel)
.transform("payload.failedmessage")
.handle(kafkaFailureHandler)
.get();
}
}
@SpringBootTest
class Demo1ApplicationTests {
@Autowired
ExecutorChAnnel kafkaErrorChAnnel;
@SpyBean
messageHandler kafkaFailureHandler;
@Test
void testSpyBean() throws InterruptedException {
messagingException payload = new messageHandlingException(new Genericmessage<>("test"));
this.kafkaErrorChAnnel.send(new Errormessage(payload));
Thread.sleep(1000);
mockito.verify(this.kafkaFailureHandler).handlemessage(ArgumentMatchers.any(message.class));
}
}
也许您的问题是您没有在 LogHandlerConfiguration
配置中包含 @SpringBootTest
。这就是为什么我要求一个简单的项目来玩。
您具有所有这些属性的代码太自定义了,无法复制/粘贴到我的环境中...
还要注意那个Thread.sleep(1000);
。由于您的 kafkaErrorChAnnel
是 ExecutorChAnnel
,消息消费发生在离开主测试线程的不同线程上,并由于竞争条件而导致失败。很难猜测正确的时间,所以最好存根一个模拟方法来实现一些线程屏障,比如 new CountDownLatch(1)
并在测试中等待它。
在主题之外,您还可以研究 Spring 集成测试框架:https://docs.spring.io/spring-integration/docs/current/reference/html/tesTing.html#test-context
,所以,被挂断了为什么不 @SpyBean
?有两个问题
@SpyBean
这是最终工作的结果,使用命名的 bean
@Bean("kafkaFailureHandler")
public messageHandler kafkaFailureHandler() {
LoggingHandler handler = new LoggingHandler(LoggingHandler.Level.INFO);
handler.setShouldLogFullmessage(Boolean.TRUE);
return handler;
}
然后在测试中也减少了最大块
@DirtiesContext
@SpringBootTest(classes = {KafkaHandlerConfiguration.class,SwiftalkKafkaGateway.class})
@SpringIntegrationTest(noAutoStartup = {"kafkaFailureFlow"})
@TestPropertysource(properties = {
"spring.main.bAnner-mode=off","logging.level.root=INFO","logging.level.org.springframework=INFO","logging.level.org.springframework.integration=DEBUG","spring.kafka.producer.properties.max.block.ms=50","spring.kafka.producer.bootstrap-servers=localhost:9999","spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.Stringserializer","spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.Jsonserializer",})
public class KafkaPublishFailureTest {
private static final Logger log = LogManager.getLogger(KafkaPublishFailureTest.class);
@Autowired
SwiftalkKafkaGateway kafkaGateway;
@SpyBean(name = "kafkaFailureHandler")
messageHandler kafkaFailureHandler;
@Test
@SuppressWarnings("all")
void testFailedKafkaPublish() throws InterruptedException {
//Dummy message
Map<String,String> map = new HashMap<>();
map.put("key","value");
// Publish message
message<Map<String,String>> message = messageBuilder.withPayload(map)
.setHeader("X-UPSTREAM-TYPE","alm")
.setHeader("X-UPSTREAM-INSTANCE","jira")
.setHeader("X-messaGE-KEY","key-1")
.build();
kafkaGateway.publish(messagE);
verify(this.kafkaFailureHandler,timeout(500)).handlemessage(any(message.class));
}
}
注意 spring.kafka.producer.properties.max.block.ms=50
和 verify(this.kafkaFailureHandler,timeout(500)).handlemessage(any(message.class));
以上是大佬教程为你收集整理的如何在 Spring 测试中监视自动装配的 bean全部内容,希望文章能够帮你解决如何在 Spring 测试中监视自动装配的 bean所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。