一、关于
Spring Cloud Stream是一个用于构建消息驱动微服务应用程序的框架。
Spring Cloud Stream引入了一些概念:
- Destination Binders:目标绑定器,目标指的是 kafka 还是 RabbitMQ,绑定器就是封装了目标中间件的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。
- Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器创建)
- Message:一种规范化的数据结构,生产者和消费者基于这个数据结构通过外部消息系统与目标绑定器和其他应用程序通信。
使用Spring Cloud Stream的好处:
- 简化开发,无需关注具体MQ的使用方法
- 降低与MQ的耦合度,可以通过配置很简单的切换MQ(目前支持kafka、rabbitmq以及rocketmq)
本文代码仓库地址:https://github.com/lazyrabb1t/rabb-springcloud-demo
二、使用
1、创建rabb-stream-consumer模块
2、引入依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
3、配置
spring:
cloud:
stream:
bindings:
# 方法名称-in-索引,索引值为参数个数
sink1-in-0:
# 指定topic名称
destination: topic1
# 指定group
group: group1
sink2-in-0:
destination: topic2
group: group2
kafka:
binder:
brokers: 127.0.0.1:9092
# 定义函数
function:
definition: sink1;sink2
4、添加消费逻辑
@Bean
public Consumer<Date> sink1() {
return System.out::println;
}
@Bean
public Consumer<String> sink2() {
return message -> {
System.out.println("sink2 receive message:" + message);
};
}
5、创建rabb-stream-provider模块
6、添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
7、配置
spring:
cloud:
stream:
bindings:
source1-out-0:
destination: topic1
group: group1
source2-out-0:
destination: topic2
group: group2
kafka:
binder:
brokers: 127.0.0.1:9092
server:
port: 18000
8、添加生产逻辑
@RestController
public class ProviderController {
@Autowired
private StreamBridge streamBridge;
@RequestMapping("/send1")
public String send1() {
streamBridge.send("source1-out-0", new Date());
return "success1";
}
@RequestMapping("/send2")
public String send2() {
streamBridge.send("source2-out-0", "How are you!");
return "success2";
}
}
9、测试
访问生产模块的接口方法,可以在消费者模块的控制台看到相应输出
参考
https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/