spring-stream plugin released

Posted 2024-08-25 05:19:42 ‐ 1 min read

The spring-stream plugin is a programming tool for real-time streaming message processing, which can greatly simplify message processing based on files, redis stream, and kafka.

Here is a simple producer:

#[auto_config(WebConfigurator)]
#[tokio::main]
async fn main() {
    App::new()
        .add_plugin(StreamPlugin)
        .add_plugin(WebPlugin)
        .run()
        .await
}

#[get("/")]
async fn send_msg(Component(producer): Component<Producer>) -> Result<impl IntoResponse> {
    let now = SystemTime::now();
    let json = json!({
        "success": true,
        "msg": format!("This message was sent at {:?}", now),
    });
    let resp = producer
        .send_json("topic", json)
        .await
        .context("send msg failed")?;

    let seq = resp.sequence();
    Ok(Json(json!({"seq":seq})))
}

Producer is used to send messages to the message store. Spring-stream is implemented using sea-streamer at the bottom layer, which abstracts file, stdio, redis, and kafka The message storage layer allows developers to send and process messages using a unified interface.

Here is a simple consumer code:

#[tokio::main]
async fn main() {
    App::new()
        .add_plugin(StreamPlugin)
        .add_consumer(consumers())
        .run()
        .await
}

fn consumers() -> Consumers {
    Consumers::new().typed_consumer(listen_topic_do_something)
}

#[stream_listener("topic")]
async fn listen_topic_do_something(Json(payload): Json<Payload>) {
    tracing::info!("{:#?}", payload);
    // do something
}

Click here to view the relevant documentation.