spring-stream Plugin
spring-stream is based on sea-streamer
Dependencies
spring-stream = { version = "<version>", features=["file"] }
spring-stream supports four message storages: file
, stdio
, redis
, and kafka
.
optional features: json
.
Configuration items
[stream]
uri = "file://./stream" # StreamerUri data stream address
StreamUri supports file, stdio, redis, and kafka. For the format of uri, please refer to StreamerUri.
- stdio is suitable for command line projects.
- file is suitable for stand-alone deployment projects.
- redis is suitable for distributed deployment projects. Redis5.0 provides stream data structure, so the redis version is required to be greater than 5.0. For details, please refer to redis stream official documentation.
- Kafka is suitable for distributed deployment projects with larger message volumes. Kafka can be replaced with redpanda, which is a high-performance message middleware written in C++ and compatible with the kafka protocol. It can completely get rid of the JVM that Kafka relies on.
Detailed stream configuration
# File stream configuration
[stream.file]
connect = { create_file = "CreateIfNotExists" }
# Standard stream configuration
[stream.stdio]
connect = { loopback = false }
# Redis stream configuration
[stream.redis]
connect = { db=0,username="user",password="passwd" }
# Kafka stream configuration
[stream.kafka]
connect = { sasl_options={mechanism="Plain",username="user",password="passwd"}}
Send message
StreamPlugin
registers a Producer
for sending messages. If you need to send messages in json format, you need to add the json
feature in the dependencies:
spring-stream = { version = "0.1.1", features=["file","json"] }
1 use anyhow::Context;
2 use serde_json::json;
3 use spring::{auto_config, App};
4 use spring_stream::{Producer, StreamPlugin};
5 use spring_web::error::Result;
6 use spring_web::get;
7 use spring_web::{
8 axum::response::{IntoResponse, Json},
9 extractor::Component,
10 WebConfigurator, WebPlugin,
11 };
12 use std::time::SystemTime;
13
14 #[auto_config(WebConfigurator)]
15 #[tokio::main]
16 async fn main() {
17 App::new()
18 .add_plugin(StreamPlugin)
19 .add_plugin(WebPlugin)
20 .run()
21 .await
22 }
23
24 #[get("/")]
25 async fn send_msg(Component(producer): Component<Producer>) -> Result<impl IntoResponse> {
26 let now = SystemTime::now();
27 let json = json!({
28 "success": true,
29 "msg": format!("This message was sent at {:?}", now),
30 });
31 let resp = producer
32 .send_json("topic", json)
33 .await
34 .context("send msg failed")?;
35
36 let seq = resp.sequence();
37 Ok(Json(json!({"seq":seq})))
38 }
39
Consume messages
spring-stream
provides a process macro called stream_listener
to subscribe to messages from a specified topic. The code is as follows:
1 use spring::tracing;
2 use spring::App;
3 use spring_stream::consumer::Consumers;
4 use spring_stream::extractor::Json;
5 use spring_stream::file::AutoStreamReset;
6 use spring_stream::handler::TypedConsumer;
7 use spring_stream::stream_listener;
8 use spring_stream::{file::FileConsumerOptions, StreamConfigurator, StreamPlugin};
9 use stream_file_example::Payload;
10
11 #[tokio::main]
12 async fn main() {
13 App::new()
14 .add_plugin(StreamPlugin)
15 .add_consumer(consumers())
16 .run()
17 .await
18 }
19
20 fn consumers() -> Consumers {
21 Consumers::new().typed_consumer(listen_topic_do_something)
22 }
23
24 #[stream_listener(
25 "topic",
26 "topic2",
27 file_consumer_options = fill_file_consumer_options
28 )]
29 async fn listen_topic_do_something(Json(payload): Json<Payload>) {
30 tracing::info!("{:#?}", payload);
31 }
32
33 fn fill_file_consumer_options(opts: &mut FileConsumerOptions) {
34 opts.set_auto_stream_reset(AutoStreamReset::Earliest);
35 }
36
View the complete example code stream-file-example, stream-redis-example, stream-kafka-example
Read configuration
You can use Config
to extract the configuration in toml. The usage is exactly the same as spring-web
.
#[derive(Debug, Configurable, Deserialize)]
#[config_prefix = "custom"]
struct CustomConfig {
a: u32,
b: bool,
}
#[stream_listener("topic")]
async fn use_toml_config(Config(conf): Config<CustomConfig>) -> impl IntoResponse {
format!("a={}, b={}", conf.a, conf.b)
}
Add the corresponding configuration to your configuration file:
[custom]
a = 1
b = true