Skip to content
This repository was archived by the owner on Oct 18, 2023. It is now read-only.

Commit 1f8de3f

Browse files
authored
Merge pull request #570 from libsql/alloc-response-message
alloc response message
2 parents 7c93d7e + 137051c commit 1f8de3f

File tree

7 files changed

+102
-38
lines changed

7 files changed

+102
-38
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

libsqlx-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ bincode = "1.3.3"
1414
bytemuck = { version = "1.13.1", features = ["derive"] }
1515
bytes = { version = "1.4.0", features = ["serde"] }
1616
bytesize = { version = "1.2.0", features = ["serde"] }
17+
chrono = { version = "0.4.26", features = ["serde"] }
1718
clap = { version = "4.3.11", features = ["derive"] }
1819
color-eyre = "0.6.2"
1920
either = "1.8.1"

libsqlx-server/src/hrana/error.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,11 @@ pub enum HranaError {
1818
}
1919

2020
impl HranaError {
21-
pub fn code(&self) -> Option<&str>{
21+
pub fn code(&self) -> Option<&str> {
2222
match self {
2323
HranaError::Stmt(e) => Some(e.code()),
2424
HranaError::StreamResponse(e) => Some(e.code()),
25-
HranaError::Stream(_)
26-
| HranaError::Libsqlx(_)
27-
| HranaError::Proto(_) => None,
25+
HranaError::Stream(_) | HranaError::Libsqlx(_) | HranaError::Proto(_) => None,
2826
}
2927
}
3028
}

libsqlx-server/src/http/admin.rs

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,49 @@ use std::sync::Arc;
44
use std::time::Duration;
55

66
use axum::extract::{Path, State};
7+
use axum::response::IntoResponse;
78
use axum::routing::{delete, post};
89
use axum::{Json, Router};
10+
use chrono::{DateTime, Utc};
911
use color_eyre::eyre::Result;
1012
use hyper::server::accept::Accept;
13+
use hyper::StatusCode;
1114
use serde::{Deserialize, Deserializer, Serialize};
1215
use tokio::io::{AsyncRead, AsyncWrite};
1316

1417
use crate::allocation::config::{AllocConfig, DbConfig};
1518
use crate::linc::bus::Bus;
1619
use crate::linc::NodeId;
1720
use crate::manager::Manager;
18-
use crate::meta::DatabaseId;
21+
use crate::meta::{AllocationError, DatabaseId};
22+
23+
impl IntoResponse for crate::error::Error {
24+
fn into_response(self) -> axum::response::Response {
25+
#[derive(Serialize)]
26+
struct ErrorBody {
27+
message: String,
28+
}
29+
30+
let mut resp = Json(ErrorBody {
31+
message: self.to_string(),
32+
})
33+
.into_response();
34+
*resp.status_mut() = match self {
35+
crate::error::Error::Libsqlx(_)
36+
| crate::error::Error::InjectorExited
37+
| crate::error::Error::ConnectionClosed
38+
| crate::error::Error::Io(_)
39+
| crate::error::Error::AllocationClosed
40+
| crate::error::Error::Internal(_)
41+
| crate::error::Error::Heed(_) => StatusCode::INTERNAL_SERVER_ERROR,
42+
crate::error::Error::Allocation(AllocationError::AlreadyExist(_)) => {
43+
StatusCode::BAD_REQUEST
44+
}
45+
};
46+
47+
resp
48+
}
49+
}
1950

2051
pub struct Config {
2152
pub bus: Arc<Bus<Arc<Manager>>>,
@@ -47,7 +78,19 @@ where
4778
struct ErrorResponse {}
4879

4980
#[derive(Serialize, Debug)]
50-
struct AllocateResp {}
81+
#[serde(rename_all = "lowercase")]
82+
enum DbType {
83+
Primary,
84+
Replica,
85+
}
86+
87+
#[derive(Serialize, Debug)]
88+
struct AllocationSummaryView {
89+
created_at: DateTime<Utc>,
90+
database_name: String,
91+
#[serde(rename = "type")]
92+
ty: DbType,
93+
}
5194

5295
#[derive(Deserialize, Debug)]
5396
struct AllocateReq {
@@ -134,7 +177,7 @@ const fn default_txn_timeout() -> HumanDuration {
134177
async fn allocate(
135178
State(state): State<Arc<AdminServerState>>,
136179
Json(req): Json<AllocateReq>,
137-
) -> Result<Json<AllocateResp>, Json<ErrorResponse>> {
180+
) -> crate::Result<Json<AllocationSummaryView>> {
138181
let config = AllocConfig {
139182
max_conccurent_connection: req.max_conccurent_connection.unwrap_or(16),
140183
db_name: req.database_name.clone(),
@@ -164,19 +207,26 @@ async fn allocate(
164207

165208
let dispatcher = state.bus.clone();
166209
let id = DatabaseId::from_name(&req.database_name);
167-
state.bus.handler().allocate(id, &config, dispatcher).await;
210+
let meta = state.bus.handler().allocate(id, config, dispatcher).await?;
168211

169-
Ok(Json(AllocateResp {}))
212+
Ok(Json(AllocationSummaryView {
213+
created_at: meta.created_at,
214+
database_name: meta.config.db_name,
215+
ty: match meta.config.db_config {
216+
DbConfig::Primary {..} => DbType::Primary,
217+
DbConfig::Replica {..} => DbType::Replica,
218+
}
219+
}))
170220
}
171221

172222
async fn deallocate(
173223
State(state): State<Arc<AdminServerState>>,
174224
Path(database_name): Path<String>,
175-
) -> Result<Json<AllocateResp>, Json<ErrorResponse>> {
225+
) -> crate::Result<()> {
176226
let id = DatabaseId::from_name(&database_name);
177-
state.bus.handler().deallocate(id).await;
227+
state.bus.handler().deallocate(id).await?;
178228

179-
Ok(Json(AllocateResp {}))
229+
Ok(())
180230
}
181231

182232
#[derive(Serialize, Debug)]
@@ -191,15 +241,16 @@ struct AllocView {
191241

192242
async fn list_allocs(
193243
State(state): State<Arc<AdminServerState>>,
194-
) -> Result<Json<ListAllocResp>, Json<ErrorResponse>> {
244+
) -> crate::Result<Json<ListAllocResp>> {
195245
let allocs = state
196246
.bus
197247
.handler()
198248
.store()
199-
.list_allocs()
200-
.unwrap()
249+
.list_allocs()?
201250
.into_iter()
202-
.map(|cfg| AllocView { id: cfg.db_name })
251+
.map(|meta| AllocView {
252+
id: meta.config.db_name,
253+
})
203254
.collect();
204255

205256
Ok(Json(ListAllocResp { allocs }))

libsqlx-server/src/http/user/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use axum::response::IntoResponse;
55
use axum::routing::post;
66
use axum::{Json, Router};
77
use color_eyre::Result;
8-
use hyper::StatusCode;
98
use hyper::server::accept::Accept;
9+
use hyper::StatusCode;
1010
use serde::Serialize;
1111
use tokio::io::{AsyncRead, AsyncWrite};
1212

@@ -30,12 +30,12 @@ impl IntoResponse for HranaError {
3030
fn into_response(self) -> axum::response::Response {
3131
let (message, code) = match self.code() {
3232
Some(code) => (self.to_string(), code.to_owned()),
33-
None => ("internal error, please check the logs".to_owned(), "INTERNAL_ERROR".to_owned()),
34-
};
35-
let resp = ErrorResponseBody {
36-
message,
37-
code,
33+
None => (
34+
"internal error, please check the logs".to_owned(),
35+
"INTERNAL_ERROR".to_owned(),
36+
),
3837
};
38+
let resp = ErrorResponseBody { message, code };
3939
let mut resp = Json(resp).into_response();
4040
*resp.status_mut() = StatusCode::BAD_REQUEST;
4141
resp

libsqlx-server/src/manager.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::compactor::CompactionQueue;
1212
use crate::linc::bus::Dispatch;
1313
use crate::linc::handler::Handler;
1414
use crate::linc::Inbound;
15-
use crate::meta::{DatabaseId, Store};
15+
use crate::meta::{AllocMeta, DatabaseId, Store};
1616
use crate::replica_commit_store::ReplicaCommitStore;
1717

1818
pub struct Manager {
@@ -52,24 +52,24 @@ impl Manager {
5252
return Ok(Some(sender.clone()));
5353
}
5454

55-
if let Some(config) = self.meta_store.meta(&database_id)? {
55+
if let Some(meta) = self.meta_store.meta(&database_id)? {
5656
let path = self.db_path.join("dbs").join(database_id.to_string());
5757
tokio::fs::create_dir_all(&path).await?;
5858
let (alloc_sender, inbox) = mpsc::channel(MAX_ALLOC_MESSAGE_QUEUE_LEN);
5959
let alloc = Allocation {
6060
inbox,
6161
database: Database::from_config(
62-
&config,
62+
&meta.config,
6363
path,
6464
dispatcher.clone(),
6565
self.compaction_queue.clone(),
6666
self.replica_commit_store.clone(),
6767
)?,
6868
connections_futs: JoinSet::new(),
6969
next_conn_id: 0,
70-
max_concurrent_connections: config.max_conccurent_connection,
70+
max_concurrent_connections: meta.config.max_conccurent_connection,
7171
dispatcher,
72-
db_name: config.db_name,
72+
db_name: meta.config.db_name,
7373
connections: HashMap::new(),
7474
};
7575

@@ -86,12 +86,13 @@ impl Manager {
8686
pub async fn allocate(
8787
self: &Arc<Self>,
8888
database_id: DatabaseId,
89-
meta: &AllocConfig,
89+
config: AllocConfig,
9090
dispatcher: Arc<dyn Dispatch>,
91-
) -> crate::Result<()> {
92-
self.store().allocate(&database_id, meta)?;
91+
) -> crate::Result<AllocMeta> {
92+
let meta = self.store().allocate(&database_id, config)?;
9393
self.schedule(database_id, dispatcher).await?;
94-
Ok(())
94+
95+
Ok(meta)
9596
}
9697

9798
pub async fn deallocate(&self, database_id: DatabaseId) -> crate::Result<()> {

libsqlx-server/src/meta.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::fmt;
22
use std::mem::size_of;
33

4+
use chrono::{DateTime, Utc};
45
use heed::bytemuck::{Pod, Zeroable};
56
use heed_types::{OwnedType, SerdeBincode};
67
use itertools::Itertools;
@@ -11,9 +12,15 @@ use tokio::task::block_in_place;
1112

1213
use crate::allocation::config::AllocConfig;
1314

15+
#[derive(Debug, Serialize, Deserialize)]
16+
pub struct AllocMeta {
17+
pub config: AllocConfig,
18+
pub created_at: DateTime<Utc>,
19+
}
20+
1421
pub struct Store {
1522
env: heed::Env,
16-
alloc_config_db: heed::Database<OwnedType<DatabaseId>, SerdeBincode<AllocConfig>>,
23+
alloc_config_db: heed::Database<OwnedType<DatabaseId>, SerdeBincode<AllocMeta>>,
1724
}
1825

1926
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug, Hash, Clone, Copy, Pod, Zeroable)]
@@ -73,7 +80,7 @@ impl Store {
7380
})
7481
}
7582

76-
pub fn allocate(&self, id: &DatabaseId, meta: &AllocConfig) -> crate::Result<()> {
83+
pub fn allocate(&self, id: &DatabaseId, config: AllocConfig) -> crate::Result<AllocMeta> {
7784
block_in_place(|| {
7885
let mut txn = self.env.write_txn()?;
7986
if self
@@ -82,14 +89,19 @@ impl Store {
8289
.get(&txn, id)?
8390
.is_some()
8491
{
85-
Err(AllocationError::AlreadyExist(meta.db_name.clone()))?;
92+
Err(AllocationError::AlreadyExist(config.db_name.clone()))?;
8693
};
8794

88-
self.alloc_config_db.put(&mut txn, id, meta)?;
95+
let meta = AllocMeta {
96+
config,
97+
created_at: Utc::now(),
98+
};
99+
100+
self.alloc_config_db.put(&mut txn, id, &meta)?;
89101

90102
txn.commit()?;
91103

92-
Ok(())
104+
Ok(meta)
93105
})
94106
}
95107

@@ -103,14 +115,14 @@ impl Store {
103115
})
104116
}
105117

106-
pub fn meta(&self, id: &DatabaseId) -> crate::Result<Option<AllocConfig>> {
118+
pub fn meta(&self, id: &DatabaseId) -> crate::Result<Option<AllocMeta>> {
107119
block_in_place(|| {
108120
let txn = self.env.read_txn()?;
109121
Ok(self.alloc_config_db.get(&txn, id)?)
110122
})
111123
}
112124

113-
pub fn list_allocs(&self) -> crate::Result<Vec<AllocConfig>> {
125+
pub fn list_allocs(&self) -> crate::Result<Vec<AllocMeta>> {
114126
block_in_place(|| {
115127
let txn = self.env.read_txn()?;
116128
let res = self

0 commit comments

Comments
 (0)