spring-stream插件

spring-stream是基于sea-streamer实现的

crates.io Documentation

依赖

spring-stream = { version = "<version>",features=["file"] }

spring-stream支持filestdiorediskafka四种消息存储。

可选的features: json.

配置项

[stream]
uri = "file://./stream"      # StreamerUri 数据流地址

StreamUri支持file、stdio、redis、kafka。uri的格式具体参考StreamerUri

  • stdio适合命令行项目。
  • file适合单机部署的项目。
  • redis适合分布式部署的项目。Redis5.0提供了stream数据结构,因此要求redis版本大于5.0。详细可以参考redis stream官方文档
  • kafka适合消息量更大的分布式部署的项目。Kafka可以用兼容redpanda替代,它是C++编写的兼容kafka协议的高性能消息中间件,用它可以彻底摆脱Kafka依赖的JVM。

流的详细配置

# 文件流配置
[stream.file]
connect = { create_file = "CreateIfNotExists" }

# 标准流配置
[stream.stdio]
connect = { loopback = false }

# redis流配置
[stream.redis]
connect = { db=0,username="user",password="passwd" }

# kafka流配置
[stream.kafka]
connect = { sasl_options={mechanism="Plain",username="user",password="passwd"}}

发送消息

StreamPlugin注册了一个Producer用于发送消息。如果需要发送json格式的消息,需要在依赖项中添加json的feature:

spring-stream = { version = "0.1.1", features=["file","json"] }
1#[auto_config(WebConfigurator)]
2#[tokio::main]
3async fn main() {
4 App::new()
5 .add_plugin(StreamPlugin)
6 .add_plugin(WebPlugin)
7 .run()
8 .await
9}
10
11#[get("/")]
12async fn send_msg(Component(producer): Component<Producer>) -> Result<impl IntoResponse> {
13 let now = SystemTime::now();
14 let json = json!({
15 "success": true,
16 "msg": format!("This message was sent at {:?}", now),
17 });
18 let resp = producer
19 .send_json("topic", json)
20 .await
21 .context("send msg failed")?;
22
23 let seq = resp.sequence();
24 Ok(Json(json!({"seq":seq})))
25}

消费消息

spring-stream提供了stream_listener的过程宏来订阅指定topic的消息,代码如下:

1#[tokio::main]
2async fn main() {
3 App::new()
4 .add_plugin(StreamPlugin)
5 .add_consumer(consumers())
6 .run()
7 .await
8}
9
10fn consumers() -> Consumers {
11 Consumers::new().typed_consumer(listen_topic_do_something)
12}
13
14#[stream_listener("topic", file_consumer_options = fill_file_consumer_options)]
15async fn listen_topic_do_something(Json(payload): Json<Payload>) {
16 tracing::info!("{:#?}", payload);
17}
18
19fn fill_file_consumer_options(opts: &mut FileConsumerOptions) {
20 opts.set_auto_stream_reset(AutoStreamReset::Earliest);
21}

完整示例代码查看stream-file-examplestream-redis-examplestream-kafka-example

读取配置

你可以用Config抽取toml中的配置。用法上和spring-web完全一致。

#[derive(Debug, Configurable, Deserialize)]
#[config_prefix = "custom"]
struct CustomConfig {
    a: u32,
    b: bool,
}

#[stream_listener("topic")]
async fn use_toml_config(Config(conf): Config<CustomConfig>) -> impl IntoResponse {
    format!("a={}, b={}", conf.a, conf.b)
}

在你的配置文件中添加相应配置:

[custom]
a = 1
b = true