程序问答   发布时间:2022-06-02  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了Spring Kafka的Spring Boot Rest API大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决Spring Kafka的Spring Boot Rest API?

开发过程中遇到Spring Kafka的Spring Boot Rest API的问题如何解决?下面主要结合日常开发的经验,给出你关于Spring Kafka的Spring Boot Rest API的解决方法建议,希望对你解决Spring Kafka的Spring Boot Rest API有所启发或帮助;

您需要使用a ReplyingKafkaTemplate将结果返回到rest控制器。

参见ReplyingKafkaTemplate。

该文档有一个示例。

@SpringBootApplication
@RestController
public class So63058608Application {

    private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

    public static voID main(String[] args) {
        SpringApplication.run(So63058608Application.class, args);
    }

    @autowired
    private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;

    @GetMapPing(path = "/get")
    public List<String> getThem() throws Exception {
        requestReplyFuture<String, String, List<String>> future =
                this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
        LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
        return future.get(10, TimeUnit.SECONDS).value();
    }

    @KafkaListener(ID = "so63058608-1", topics = "so63058608-1", splitIterables = falsE)
    @SendTo
    public List<String> returnList(@Payload(required = falsE) String payload) {
        return new ArrayList<>(List.of("foo", "bar", "baz"));
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentmessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentmessageListenerContainer<String, List<String>> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {

        ConcurrentmessageListenerContainer<String, List<String>> container =
                containerFactory.createContainer("so63058608-2");
        container.getContainerPropertIEs().setGroupID("so63058608-2");
        container.setBatchErrorHandler(new BatchLoggingErrorHandler());
        return container;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    public Newtopic topic1() {
        return topicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
    }

    @Bean
    public Newtopic topic3() {
        return topicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
    }

}



spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earlIEst
spring.kafka.consumer.propertIEs.spring.Json.trusted.packages=*

spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.Jsonserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.Jsonserializer



$ curl localhost:8080/get
["foo","bar","baz"]

并返回一些对象的列表…

@SpringBootApplication
@RestController
public class So63058608Application {

    private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);

    public static voID main(String[] args) {
        SpringApplication.run(So63058608Application.class, args);
    }

    @autowired
    private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;

    @GetMapPing(path = "/get")
    public List<Foo> getThem() throws Exception {
        requestReplyFuture<String, String, List<Foo>> future =
                this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
        LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
        List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
        LOG.info(result.toString());
        return result;
    }

    @KafkaListener(ID = "so63058608-1", topics = "so63058608-1", splitIterables = falsE)
    @SendTo
    public List<Foo> returnList(@Payload(required = falsE) String payload) {
        return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentmessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentmessageListenerContainer<String, List<Foo>> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {

        ConcurrentmessageListenerContainer<String, List<Foo>> container =
                containerFactory.createContainer("so63058608-2");
        container.getContainerPropertIEs().setGroupID("so63058608-2");
        container.setBatchErrorHandler(new BatchLoggingErrorHandler());
        return container;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    public Newtopic topic1() {
        return topicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
    }

    @Bean
    public Newtopic topic3() {
        return topicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
    }

    public static JavaType returnType(byte[] data, headers headers) {
        return TypeFactory.defaulTinstance()
                .constructCollectionlikeType(List.class, Foo.class);
    }

}

class Foo {

    private String bar;

    public Foo() {
    }

    public Foo(String bar) {
        this.bar = bar;
    }

    public String getbar() {
        return this.bar;
    }

    public voID setbar(String bar) {
        this.bar = bar;
    }

    @OverrIDe
    public String toString() {
        return "Foo [bar=" + this.bar + "]";
    }

}



spring.kafka.consumer.propertIEs.spring.Json.value.type.method=com.example.demo.so63058608Application.returnType



[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]

解决方法

我已经设计了一个Spring Boot REST API ADD和GET方法

    @RestController("ProductV1Controller")
    public class ProductController 
     {

         private final IProductProducer _productProducer;
         public ProductController(IProductProducer productProducer) {
        _productProducer = productProducer;}

         @PostMapping()
            void AddProduct(@Valid @requestBody ProductViewModel product) {
                _productProducer.AddProduct(product);
            }

        @GetMapping()
            List<ProductViewModel> Products() {
                var test = _productProducer.GetProducts();
                return _productProducer.GetProducts();
            }
}

服务层@H_874_33@
@service

    public class ProductProducer implements IProductProducer{
        private final KafkaTemplate<String,Object> _template;

        public ProductProducer(KafkaTemplate<String,Object> _templatE) {
            this._template = _template;
        }

        @Override
        public List<ProductViewModel> GetProducts() {
            this._template.send(ProductTopicConstants.GET_PRODUCTS,null);
            return List.of(new ProductViewModel("","","")); --> Need to return the value from the kafka
        }

        @Override
        public void AddProduct(ProductViewModel product) {
            this._template.send(ProductTopicConstants.ADD_PRODUCT,product);
        }

    }

卡夫卡听众@H_874_33@
 @KafkaListener(id = ProductTopicConstants.GET_PRODUCTS,topics = ProductTopicConstants.GET_PRODUCTS)
    public List<Product> GetProducts() {
        return _productRepository.findAll();
    }

在服务层中,GetProducts()我需要返回来自的项目列表。_productRepository.findAll();

使用Spring kafka进行REST API的最佳方法是什么?

大佬总结

以上是大佬教程为你收集整理的Spring Kafka的Spring Boot Rest API全部内容,希望文章能够帮你解决Spring Kafka的Spring Boot Rest API所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。