程序问答   发布时间:2022-06-02  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了如何在 Spring 测试中监视自动装配的 bean大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决如何在 Spring 测试中监视自动装配的 bean?

开发过程中遇到如何在 Spring 测试中监视自动装配的 bean的问题如何解决?下面主要结合日常开发的经验,给出你关于如何在 Spring 测试中监视自动装配的 bean的解决方法建议,希望对你解决如何在 Spring 测试中监视自动装配的 bean有所启发或帮助;

我有一个简单的日志处理程序 bean 配置,我将其注入到 IntegrationFlow

@Configuration
class LogHandlerConfiguration {

    private LoggingHandler handler;

    @Bean
    public messageHandler kafkasuccessHandler() {
        return getLogger(LoggingHandler.Level.INFO);
    }

    @Bean(name="kafkaFailureHandler")
    public messageHandler kafkaFailureHandler() {
        return getLogger(LoggingHandler.Level.ERROR);
    }

    private LoggingHandler getLogger(LoggingHandler.Level level) {
        handler = new LoggingHandler(level);
        handler.setShouldLogFullmessage(Boolean.TRUE);
        return handler;
    }
}

要测试的集成流程

@Bean
IntegrationFlow kafkaFailureFlow(ExecutorChAnnel kafkaErrorChAnnel,messageHandler kafkaFailureHandler) {
    return IntegrationFlows.from(kafkaErrorChAnnel)
            .transform("payload.Failedmessage")
            .handle(kafkaFailureHandler)
            .get();
}

这是我的测试

@SpyBean
messageHandler kafkaFailureHandler;

@BeforeEach
public voID setup() {
    mockitoAnnotations.openmocks(KafkaPublishFailureTest.class);
}

@Test
voID testFailedKafkaPublish() {

    //Dummy message
    Map<String,String> map = new HashMap<>();
    map.put("key","value");
    // Publish message
    message<Map<String,String>> message = messageBuilder.withPayload(map)
            .setheader("X-UPSTREAM-TYPE","alm")
            .setheader("X-UPSTREAM-INSTANCE","jira")
            .setheader("X-messaGE-KEY","key-1")
            .build();

    kafkaGateway.publish(messagE);
    // Failure handler called
    mockito.verify(kafkaFailureHandler,mockito.timeout(0).atleastOnce()).handlemessage(
            ArgumentMatchers.any(message.class));
}

我们创建了一个通用的 Kafka Producer、Consumer 配置,下游应用可以将最适合其需求的失败和成功处理程序附加到该配置上。我无法验证在这种情况下 LoggingHandler 至少被调用了一次。

failureHandlerExecturoeChAnnel 支持的 ThreadPoolTaskExecutor 下执行

@Bean
ExecutorChAnnel kafkaErrorChAnnel(Executor threadPoolExecutor) {
    return messageChAnnels.executor("kafkaErrorChAnnel",threadPoolExecutor).get();
}

通过重试建议处理失败

@Bean
requestHandlerRetryAdvice retryAdvice(ExecutorChAnnel kafkaErrorChAnnel) {
    requestHandlerRetryAdvice retryAdvice = new requestHandlerRetryAdvice();
    retryAdvice.setRecoveryCallBACk(new ErrormessageSendingRecoverer(kafkaErrorChAnnel));
    return retryAdvice;
}

运行测试时出现此错误

java.lang.IllegalStateException: No bean found for deFinition [SpyDeFinition@44dfdd58 name = '',typetoSpy = org.springframework.messaging.messageHandler,reset = AFTER]
    at org.springframework.util.Assert.state(Assert.java:97) ~[spring-core-5.3.4.jar:5.3.4]
    at org.springframework.boot.test.mock.mockito.mockitopostProcessor.inject(mockitopostProcessor.java:351) ~[spring-boot-test-2.4.3.jar:2.4.3]

解决方法

这是我尝试过的并且有效的方法:

@SpringBootApplication
public class Demo1Application {

    public static void main(String[] args) {
        SpringApplication.run(Demo1Application.class,args);
    }

    @Bean
    ExecutorChAnnel kafkaErrorChAnnel(TaskExecutor taskExecutor) {
        return new ExecutorChAnnel(taskExecutor);
    }

    @Bean
    public messageHandler kafkaFailureHandler() {
        LoggingHandler handler = new LoggingHandler(LoggingHandler.Level.ERROR);
        handler.setShouldLogFullmessage(Boolean.TRUE);
        return handler;
    }

    @Bean
    IntegrationFlow kafkaFailureFlow(ExecutorChAnnel kafkaErrorChAnnel,messageHandler kafkaFailureHandler) {
        return IntegrationFlows.from(kafkaErrorChAnnel)
                .transform("payload.failedmessage")
                .handle(kafkaFailureHandler)
                .get();
    }

}

@SpringBootTest
class Demo1ApplicationTests {

    @Autowired
    ExecutorChAnnel kafkaErrorChAnnel;

    @SpyBean
    messageHandler kafkaFailureHandler;

    @Test
    void testSpyBean() throws InterruptedException {
        messagingException payload = new messageHandlingException(new Genericmessage<>("test"));
        this.kafkaErrorChAnnel.send(new Errormessage(payload));
        Thread.sleep(1000);
        mockito.verify(this.kafkaFailureHandler).handlemessage(ArgumentMatchers.any(message.class));
    }

}

也许您的问题是您没有在 LogHandlerConfiguration 配置中包含 @SpringBootTest。这就是为什么我要求一个简单的项目来玩。 您具有所有这些属性的代码太自定义了,无法复制/粘贴到我的环境中...

还要注意那个Thread.sleep(1000);。由于您的 kafkaErrorChAnnelExecutorChAnnel,消息消费发生在离开主测试线程的不同线程上,并由于竞争条件而导致失败。很难猜测正确的时间,所以最好存根一个模拟方法来实现一些线程屏障,比如 new CountDownLatch(1) 并在测试中等待它。

在主题之外,您还可以研究 Spring 集成测试框架:https://docs.spring.io/spring-integration/docs/current/reference/html/tesTing.html#test-context

,

所以,被挂断了为什么不 @SpyBean?有两个问题

  1. 成功和失败处理程序都是 messageHandlers 混淆@SpyBean
  2. Kafka 生产者等待时间太长,即 1000 毫秒

这是最终工作的结果,使用命名的 bean

@Bean("kafkaFailureHandler")
public messageHandler kafkaFailureHandler() {
    LoggingHandler handler = new LoggingHandler(LoggingHandler.Level.INFO);
    handler.setShouldLogFullmessage(Boolean.TRUE);
    return handler;
}

然后在测试中也减少了最大块

@DirtiesContext
@SpringBootTest(classes = {KafkaHandlerConfiguration.class,SwiftalkKafkaGateway.class})
@SpringIntegrationTest(noAutoStartup = {"kafkaFailureFlow"})
@TestPropertysource(properties = {
        "spring.main.bAnner-mode=off","logging.level.root=INFO","logging.level.org.springframework=INFO","logging.level.org.springframework.integration=DEBUG","spring.kafka.producer.properties.max.block.ms=50","spring.kafka.producer.bootstrap-servers=localhost:9999","spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.Stringserializer","spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.Jsonserializer",})
public class KafkaPublishFailureTest {

    private static final Logger log = LogManager.getLogger(KafkaPublishFailureTest.class);

    @Autowired
    SwiftalkKafkaGateway kafkaGateway;

    @SpyBean(name = "kafkaFailureHandler")
    messageHandler kafkaFailureHandler;

    @Test
    @SuppressWarnings("all")
    void testFailedKafkaPublish() throws InterruptedException {

        //Dummy message
        Map<String,String> map = new HashMap<>();
        map.put("key","value");
        // Publish message
        message<Map<String,String>> message = messageBuilder.withPayload(map)
                .setHeader("X-UPSTREAM-TYPE","alm")
                .setHeader("X-UPSTREAM-INSTANCE","jira")
                .setHeader("X-messaGE-KEY","key-1")
                .build();

        kafkaGateway.publish(messagE);
        verify(this.kafkaFailureHandler,timeout(500)).handlemessage(any(message.class));
    }

}

注意 spring.kafka.producer.properties.max.block.ms=50verify(this.kafkaFailureHandler,timeout(500)).handlemessage(any(message.class));

大佬总结

以上是大佬教程为你收集整理的如何在 Spring 测试中监视自动装配的 bean全部内容,希望文章能够帮你解决如何在 Spring 测试中监视自动装配的 bean所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。