spring-stream plugin released
Posted 2024-08-25 05:19:42 ‐ 1 min read
The spring-stream plugin is a programming tool for real-time streaming message processing, which can greatly simplify message processing based on files, redis stream, and kafka.
Here is a simple producer:
#[auto_config(WebConfigurator)]
#[tokio::main]
async fn main() {
App::new()
.add_plugin(StreamPlugin)
.add_plugin(WebPlugin)
.run()
.await
}
#[get("/")]
async fn send_msg(Component(producer): Component<Producer>) -> Result<impl IntoResponse> {
let now = SystemTime::now();
let json = json!({
"success": true,
"msg": format!("This message was sent at {:?}", now),
});
let resp = producer
.send_json("topic", json)
.await
.context("send msg failed")?;
let seq = resp.sequence();
Ok(Json(json!({"seq":seq})))
}
Producer is used to send messages to the message store. Spring-stream is implemented using sea-streamer at the bottom layer, which abstracts file, stdio, redis, and kafka The message storage layer allows developers to send and process messages using a unified interface.
Here is a simple consumer code:
#[tokio::main]
async fn main() {
App::new()
.add_plugin(StreamPlugin)
.add_consumer(consumers())
.run()
.await
}
fn consumers() -> Consumers {
Consumers::new().typed_consumer(listen_topic_do_something)
}
#[stream_listener("topic")]
async fn listen_topic_do_something(Json(payload): Json<Payload>) {
tracing::info!("{:#?}", payload);
// do something
}
Click here to view the relevant documentation.