spring-stream Plugin

spring-stream is based on sea-streamer

crates.io Documentation

Dependencies

spring-stream = { version = "0.1.1", features=["file"] }

spring-stream supports four message storages: file, stdio, redis, and kafka.

Configuration items

[stream]
uri = "file://./stream"   # StreamerUri data stream address

StreamUri supports file, stdio, redis, and kafka. For the format of uri, please refer to StreamerUri.

  • stdio is suitable for command line projects.
  • file is suitable for stand-alone deployment projects.
  • redis is suitable for distributed deployment projects. Redis5.0 provides stream data structure, so the redis version is required to be greater than 5.0. For details, please refer to redis stream official documentation.
  • Kafka is suitable for distributed deployment projects with larger message volumes. Kafka can be replaced with redpanda, which is a high-performance message middleware written in C++ and compatible with the kafka protocol. It can completely get rid of the JVM that Kafka relies on.

Detailed stream configuration

# File stream configuration
[stream.file]
connect = { create_file = "CreateIfNotExists" }

# Standard stream configuration
[stream.stdio]
connect = { loopback = false }

# Redis stream configuration
[stream.redis]
connect = { db=0,username="user",password="passwd" }

# Kafka stream configuration
[stream.kafka]
connect = { sasl_options={mechanism="Plain",username="user",password="passwd"}}

Send message

StreamPlugin registers a Producer for sending messages. If you need to send messages in json format, you need to add the json feature in the dependencies:

spring-stream = { version = "0.1.1", features=["file","json"] }
1use anyhow::Context;
2use serde_json::json;
3use spring::{auto_config, App};
4use spring_stream::{Producer, StreamPlugin};
5use spring_web::error::Result;
6use spring_web::get;
7use spring_web::{
8 axum::response::{IntoResponse, Json},
9 extractor::Component,
10 WebConfigurator, WebPlugin,
11};
12use std::time::SystemTime;
13
14#[auto_config(WebConfigurator)]
15#[tokio::main]
16async fn main() {
17 App::new()
18 .add_plugin(StreamPlugin)
19 .add_plugin(WebPlugin)
20 .run()
21 .await
22}
23
24#[get("/")]
25async fn send_msg(Component(producer): Component<Producer>) -> Result<impl IntoResponse> {
26 let now = SystemTime::now();
27 let json = json!({
28 "success": true,
29 "msg": format!("This message was sent at {:?}", now),
30 });
31 let resp = producer
32 .send_json("topic", json)
33 .await
34 .context("send msg failed")?;
35
36 let seq = resp.sequence();
37 Ok(Json(json!({"seq":seq})))
38}
39

Consume messages

spring-stream provides a process macro called stream_listener to subscribe to messages from a specified topic. The code is as follows:

1use spring::tracing;
2use spring::App;
3use spring_stream::consumer::Consumers;
4use spring_stream::extractor::Json;
5use spring_stream::file::AutoStreamReset;
6use spring_stream::handler::TypedConsumer;
7use spring_stream::stream_listener;
8use spring_stream::{file::FileConsumerOptions, StreamConfigurator, StreamPlugin};
9use stream_file_example::Payload;
10
11#[tokio::main]
12async fn main() {
13 App::new()
14 .add_plugin(StreamPlugin)
15 .add_consumer(consumers())
16 .run()
17 .await
18}
19
20fn consumers() -> Consumers {
21 Consumers::new().typed_consumer(listen_topic_do_something)
22}
23
24#[stream_listener(
25 "topic",
26 "topic2",
27 file_consumer_options = fill_file_consumer_options
28)]
29async fn listen_topic_do_something(Json(payload): Json<Payload>) {
30 tracing::info!("{:#?}", payload);
31}
32
33fn fill_file_consumer_options(opts: &mut FileConsumerOptions) {
34 opts.set_auto_stream_reset(AutoStreamReset::Earliest);
35}
36

View the complete example code stream-file-example, stream-redis-example, stream-kafka-example