大佬教程收集整理的这篇文章主要介绍了Spring Kafka的Spring Boot Rest API,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
您需要使用a ReplyingKafkaTemplate
将结果返回到rest控制器。
参见ReplyingKafkaTemplate。
该文档有一个示例。
@SpringBootApplication
@RestController
public class So63058608Application {
private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
public static voID main(String[] args) {
SpringApplication.run(So63058608Application.class, args);
}
@autowired
private ReplyingKafkaTemplate<String, String, List<String>> replyTemplate;
@GetMapPing(path = "/get")
public List<String> getThem() throws Exception {
requestReplyFuture<String, String, List<String>> future =
this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
return future.get(10, TimeUnit.SECONDS).value();
}
@KafkaListener(ID = "so63058608-1", topics = "so63058608-1", splitIterables = falsE)
@SendTo
public List<String> returnList(@Payload(required = falsE) String payload) {
return new ArrayList<>(List.of("foo", "bar", "baz"));
}
@Bean
public ReplyingKafkaTemplate<String, String, List<String>> replyer(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
containerFactory.setReplyTemplate(kafkaTemplate(pf));
ConcurrentmessageListenerContainer<String, List<String>> container = replyContainer(containerFactory);
ReplyingKafkaTemplate<String, String, List<String>> replyer = new ReplyingKafkaTemplate<>(pf, container);
return replyer;
}
@Bean
public ConcurrentmessageListenerContainer<String, List<String>> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, List<String>> containerFactory) {
ConcurrentmessageListenerContainer<String, List<String>> container =
containerFactory.createContainer("so63058608-2");
container.getContainerPropertIEs().setGroupID("so63058608-2");
container.setBatchErrorHandler(new BatchLoggingErrorHandler());
return container;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public Newtopic topic1() {
return topicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
}
@Bean
public Newtopic topic3() {
return topicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
}
}
spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.auto-offset-reset=earlIEst
spring.kafka.consumer.propertIEs.spring.Json.trusted.packages=*
spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.Jsonserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.Jsonserializer
$ curl localhost:8080/get
["foo","bar","baz"]
并返回一些对象的列表…
@SpringBootApplication
@RestController
public class So63058608Application {
private static final Logger LOG = LoggerFactory.getLogger(So63058608Application.class);
public static voID main(String[] args) {
SpringApplication.run(So63058608Application.class, args);
}
@autowired
private ReplyingKafkaTemplate<String, String, List<Foo>> replyTemplate;
@GetMapPing(path = "/get")
public List<Foo> getThem() throws Exception {
requestReplyFuture<String, String, List<Foo>> future =
this.replyTemplate.sendAndReceive(new ProducerRecord<>("so63058608-1", 0, null, null));
LOG.info(future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Foo> result = future.get(10, TimeUnit.SECONDS).value();
LOG.info(result.toString());
return result;
}
@KafkaListener(ID = "so63058608-1", topics = "so63058608-1", splitIterables = falsE)
@SendTo
public List<Foo> returnList(@Payload(required = falsE) String payload) {
return new ArrayList<>(List.of(new Foo("foo"), new Foo("bar"), new Foo("baz")));
}
@Bean
public ReplyingKafkaTemplate<String, String, List<Foo>> replyer(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
containerFactory.setReplyTemplate(kafkaTemplate(pf));
ConcurrentmessageListenerContainer<String, List<Foo>> container = replyContainer(containerFactory);
ReplyingKafkaTemplate<String, String, List<Foo>> replyer = new ReplyingKafkaTemplate<>(pf, container);
return replyer;
}
@Bean
public ConcurrentmessageListenerContainer<String, List<Foo>> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, List<Foo>> containerFactory) {
ConcurrentmessageListenerContainer<String, List<Foo>> container =
containerFactory.createContainer("so63058608-2");
container.getContainerPropertIEs().setGroupID("so63058608-2");
container.setBatchErrorHandler(new BatchLoggingErrorHandler());
return container;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public Newtopic topic1() {
return topicBuilder.name("so63058608-1").partitions(1).replicas(1).build();
}
@Bean
public Newtopic topic3() {
return topicBuilder.name("so63058608-2").partitions(1).replicas(1).build();
}
public static JavaType returnType(byte[] data, headers headers) {
return TypeFactory.defaulTinstance()
.constructCollectionlikeType(List.class, Foo.class);
}
}
class Foo {
private String bar;
public Foo() {
}
public Foo(String bar) {
this.bar = bar;
}
public String getbar() {
return this.bar;
}
public voID setbar(String bar) {
this.bar = bar;
}
@OverrIDe
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
spring.kafka.consumer.propertIEs.spring.Json.value.type.method=com.example.demo.so63058608Application.returnType
[Foo [bar=foo], Foo [bar=bar], Foo [bar=baz]]
我已经设计了一个Spring Boot REST API ADD和GET方法
@RestController("ProductV1Controller")
public class ProductController
{
private final IProductProducer _productProducer;
public ProductController(IProductProducer productProducer) {
_productProducer = productProducer;}
@PostMapping()
void AddProduct(@Valid @requestBody ProductViewModel product) {
_productProducer.AddProduct(product);
}
@GetMapping()
List<ProductViewModel> Products() {
var test = _productProducer.GetProducts();
return _productProducer.GetProducts();
}
}
@service
public class ProductProducer implements IProductProducer{
private final KafkaTemplate<String,Object> _template;
public ProductProducer(KafkaTemplate<String,Object> _templatE) {
this._template = _template;
}
@Override
public List<ProductViewModel> GetProducts() {
this._template.send(ProductTopicConstants.GET_PRODUCTS,null);
return List.of(new ProductViewModel("","","")); --> Need to return the value from the kafka
}
@Override
public void AddProduct(ProductViewModel product) {
this._template.send(ProductTopicConstants.ADD_PRODUCT,product);
}
}
@KafkaListener(id = ProductTopicConstants.GET_PRODUCTS,topics = ProductTopicConstants.GET_PRODUCTS)
public List<Product> GetProducts() {
return _productRepository.findAll();
}
在服务层中,GetProducts()
我需要返回来自的项目列表。_productRepository.findAll();
使用Spring kafka进行REST API的最佳方法是什么?
以上是大佬教程为你收集整理的Spring Kafka的Spring Boot Rest API全部内容,希望文章能够帮你解决Spring Kafka的Spring Boot Rest API所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。