Skip to content

Commit af4d7c4

Browse files
authored
Remove old code based on map storage of stream (#5)
1 parent ee8d71c commit af4d7c4

File tree

3 files changed

+40
-94
lines changed

3 files changed

+40
-94
lines changed

server/src/main.rs

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,32 @@
1-
use actix_web::{put, get, App, HttpRequest, HttpResponse, HttpServer};
2-
mod stream;
3-
mod s3;
1+
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
42

3+
mod s3;
54

5+
fn read_config() -> s3::ConfigToml {
6+
s3::read_config("Config.toml")
7+
}
68

7-
#[put("/{stream}")]
89
async fn put_stream(req: HttpRequest) -> HttpResponse {
910
let stream_name: String = req.match_info().get("stream").unwrap().parse().unwrap();
1011
let stream_name_clone = stream_name.clone();
11-
match stream::insert_stream(stream_name, stream::Stream::empty()) {
12-
None => HttpResponse::Ok().body(format!("Created Stream {}", stream_name_clone)),
13-
Some(_) => {
14-
HttpResponse::Ok().body(format!("Updated Stream {}", stream_name_clone))
15-
}
16-
}
17-
}
12+
let s3_bucket = read_config().s3.aws_bucket_name;
13+
let s3_client = s3::init_s3client(read_config());
1814

19-
#[get("/list")]
20-
async fn list_stream() -> HttpResponse {
21-
let map = stream::STREAMS.lock().unwrap();
22-
for (k, _) in map.iter() {
23-
println!("key={}", k);
15+
match s3::create_stream(Some(s3_client), s3_bucket, stream_name) {
16+
Ok(_) => HttpResponse::Ok().body(format!("Created Stream {}", stream_name_clone)),
17+
Err(_) => {
18+
HttpResponse::Ok().body(format!("Failed to create Stream {}", stream_name_clone))
19+
}
2420
}
25-
HttpResponse::Ok().body("Listed Stream")
2621
}
2722

2823
#[actix_web::main]
2924
async fn main() -> std::io::Result<()> {
30-
let read_config = s3::read_config("Config.toml");
31-
let init_s3client = s3::init_s3client(read_config);
32-
let create_stream = s3::create_stream(init_s3client.0,init_s3client.1, "stream_name");
33-
println!("{:?}", create_stream);
34-
HttpServer::new(|| App::new().service(put_stream).service(list_stream))
25+
HttpServer::new(|| {
26+
App::new()
27+
.route("/{stream}", web::put().to(put_stream))
28+
})
3529
.bind("127.0.0.1:8080")?
3630
.run()
3731
.await
38-
}
32+
}

server/src/s3.rs

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,21 @@
11
use std::fs;
22
use serde::Deserialize;
33
use std::env;
4-
use std::path::Path;
54
use http::{Uri};
6-
use aws_sdk_s3::{Client,ByteStream,Config,Endpoint,Error};
5+
use aws_sdk_s3::{Client,Config,Endpoint,Error};
76

8-
9-
#[derive(Deserialize)]
10-
#[derive(Debug)]
7+
#[derive(Debug,Deserialize)]
118
pub struct ConfigToml {
12-
s3: S3,
9+
pub s3: S3,
1310
}
1411

15-
#[derive(Deserialize)]
16-
#[derive(Debug)]
17-
struct S3 {
12+
#[derive(Debug,Deserialize)]
13+
pub struct S3 {
1814
aws_access_key_id: String,
1915
aws_secret_key: String,
2016
aws_default_region: String,
2117
aws_endpoint_url: String,
22-
aws_bucket_name: String,
18+
pub aws_bucket_name: String,
2319
}
2420

2521
pub fn read_config(file_name: &str) -> ConfigToml {
@@ -28,12 +24,10 @@ pub fn read_config(file_name: &str) -> ConfigToml {
2824
config_info
2925
}
3026

31-
pub fn init_s3client(config_info: ConfigToml) -> (aws_sdk_s3::Client, String) {
32-
27+
pub fn init_s3client(config_info: ConfigToml) -> aws_sdk_s3::Client {
3328
let ( secret_key, access_key, region, endpoint_url, bucket_name ) = ("AWS_SECRET_ACCESS_KEY", "AWS_ACCESS_KEY_ID", "AWS_DEFAULT_REGION", "AWS_ENDPOINT_URL", "AWS_BUCKET_NAME");
3429
let data = vec![secret_key, access_key, region, endpoint_url];
3530

36-
3731
for data in data.iter() {
3832
match data {
3933
&"AWS_SECRET_ACCESS_KEY" => env::set_var(secret_key, &config_info.s3.aws_secret_key),
@@ -42,40 +36,30 @@ pub fn init_s3client(config_info: ConfigToml) -> (aws_sdk_s3::Client, String) {
4236
&"AWS_ENDPOINT_URL" => env::set_var(endpoint_url, &config_info.s3.aws_endpoint_url),
4337
&"AWS_BUCKET_NAME" => env::set_var(bucket_name, &config_info.s3.aws_bucket_name),
4438
_ => println!("{:?}",config_info),
39+
}
4540
}
46-
}
47-
48-
println!("{}", &config_info.s3.aws_bucket_name);
49-
5041
let ep = env::var("AWS_ENDPOINT_URL").unwrap_or("none".to_string());
5142
let uri = ep.parse::<Uri>().unwrap();
5243
let endpoint = Endpoint::immutable(uri);
5344
let config = Config::builder().endpoint_resolver(endpoint).build();
54-
55-
return (Client::from_conf(config), config_info.s3.aws_bucket_name)
45+
Client::from_conf(config)
5646
}
5747

5848
#[tokio::main]
59-
pub async fn create_stream(s3_client: aws_sdk_s3::Client,bucket_name: String, stream_name: &str) -> Result<(), Error> {
60-
61-
let body = ByteStream::from_path(Path::new("Cargo.toml")).await;
62-
63-
match body {
64-
Ok(b) => {
65-
let resp = s3_client
66-
.put_object()
67-
.bucket(bucket_name)
68-
.key(format!("{}{}", stream_name, "/.schema"))
69-
.body(b)
70-
.send()
71-
.await?;
72-
49+
pub async fn create_stream(s3_client: Option<aws_sdk_s3::Client>,bucket_name: String, stream_name: String) -> Result<(), Error> {
50+
match s3_client {
51+
Some(client) => {
52+
let resp = client
53+
.put_object()
54+
.bucket(bucket_name)
55+
.key(format!("{}{}", stream_name, "/.schema"))
56+
.send()
57+
.await?;
7358
println!("Upload success. Version: {:?}", resp.version_id);
74-
}
75-
Err(e) => {
76-
println!("Got an error DOING SOMETHING:");
77-
println!("{}", e);
78-
}
59+
Ok(())
60+
},
61+
_ => {
62+
Ok(())
63+
},
7964
}
80-
Ok(())
8165
}

server/src/stream.rs

Lines changed: 0 additions & 32 deletions
This file was deleted.

0 commit comments

Comments
 (0)