spring-stream插件发布了
Posted 2024-08-25 05:19:42 ‐ 1 min read
spring-stream插件是实时流式处理消息的编程工具,可以大大简化基于文件、redis stream、kafka的消息处理
下面是一个简单的生产者:
#[auto_config(WebConfigurator)]
#[tokio::main]
async fn main() {
App::new()
.add_plugin(StreamPlugin)
.add_plugin(WebPlugin)
.run()
.await
}
#[get("/")]
async fn send_msg(Component(producer): Component<Producer>) -> Result<impl IntoResponse> {
let now = SystemTime::now();
let json = json!({
"success": true,
"msg": format!("This message was sent at {:?}", now),
});
let resp = producer
.send_json("topic", json)
.await
.context("send msg failed")?;
let seq = resp.sequence();
Ok(Json(json!({"seq":seq})))
}
Producer用于向消息存储发送消息。spring-stream底层使用sea-streamer实现,它抽象了file、stdio、redis、kafka消息存储层,使开发者可以用统一的接口来发送和处理消息。
下面是一个简单的消费者的代码:
#[tokio::main]
async fn main() {
App::new()
.add_plugin(StreamPlugin)
.add_consumer(consumers())
.run()
.await
}
fn consumers() -> Consumers {
Consumers::new().typed_consumer(listen_topic_do_something)
}
#[stream_listener("topic")]
async fn listen_topic_do_something(Json(payload): Json<Payload>) {
tracing::info!("{:#?}", payload);
// do something
}
点击这里可以查看相关文档。