spring-stream插件
spring-stream是基于sea-streamer实现的
依赖
spring-stream = { version = "<version>",features=["file"] }
spring-stream支持file
、stdio
、redis
、kafka
四种消息存储。
可选的features: json
.
配置项
[stream]
uri = "file://./stream" # StreamerUri 数据流地址
StreamUri支持file、stdio、redis、kafka。uri的格式具体参考StreamerUri。
- stdio适合命令行项目。
- file适合单机部署的项目。
- redis适合分布式部署的项目。Redis5.0提供了stream数据结构,因此要求redis版本大于5.0。详细可以参考redis stream官方文档。
- kafka适合消息量更大的分布式部署的项目。Kafka可以用兼容redpanda替代,它是C++编写的兼容kafka协议的高性能消息中间件,用它可以彻底摆脱Kafka依赖的JVM。
流的详细配置
# 文件流配置
[stream.file]
connect = { create_file = "CreateIfNotExists" }
# 标准流配置
[stream.stdio]
connect = { loopback = false }
# redis流配置
[stream.redis]
connect = { db=0,username="user",password="passwd" }
# kafka流配置
[stream.kafka]
connect = { sasl_options={mechanism="Plain",username="user",password="passwd"}}
发送消息
StreamPlugin
注册了一个Producer
用于发送消息。如果需要发送json格式的消息,需要在依赖项中添加json
的feature:
spring-stream = { version = "0.1.1", features=["file","json"] }
1 #[auto_config(WebConfigurator)]
2 #[tokio::main]
3 async fn main() {
4 App::new()
5 .add_plugin(StreamPlugin)
6 .add_plugin(WebPlugin)
7 .run()
8 .await
9 }
10
11 #[get("/")]
12 async fn send_msg(Component(producer): Component<Producer>) -> Result<impl IntoResponse> {
13 let now = SystemTime::now();
14 let json = json!({
15 "success": true,
16 "msg": format!("This message was sent at {:?}", now),
17 });
18 let resp = producer
19 .send_json("topic", json)
20 .await
21 .context("send msg failed")?;
22
23 let seq = resp.sequence();
24 Ok(Json(json!({"seq":seq})))
25 }
消费消息
spring-stream
提供了stream_listener
的过程宏来订阅指定topic的消息,代码如下:
1 #[tokio::main]
2 async fn main() {
3 App::new()
4 .add_plugin(StreamPlugin)
5 .add_consumer(consumers())
6 .run()
7 .await
8 }
9
10 fn consumers() -> Consumers {
11 Consumers::new().typed_consumer(listen_topic_do_something)
12 }
13
14 #[stream_listener("topic", file_consumer_options = fill_file_consumer_options)]
15 async fn listen_topic_do_something(Json(payload): Json<Payload>) {
16 tracing::info!("{:#?}", payload);
17 }
18
19 fn fill_file_consumer_options(opts: &mut FileConsumerOptions) {
20 opts.set_auto_stream_reset(AutoStreamReset::Earliest);
21 }
完整示例代码查看stream-file-example、stream-redis-example、stream-kafka-example
读取配置
你可以用Config
抽取toml中的配置。用法上和spring-web
完全一致。
#[derive(Debug, Configurable, Deserialize)]
#[config_prefix = "custom"]
struct CustomConfig {
a: u32,
b: bool,
}
#[stream_listener("topic")]
async fn use_toml_config(Config(conf): Config<CustomConfig>) -> impl IntoResponse {
format!("a={}, b={}", conf.a, conf.b)
}
在你的配置文件中添加相应配置:
[custom]
a = 1
b = true