Skip to content

Commit 262f4a2

Browse files
committed
coderabbit suggestions
1 parent 5e29431 commit 262f4a2

File tree

3 files changed

+38
-14
lines changed

3 files changed

+38
-14
lines changed

src/handlers/http/logstream.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,19 @@ pub async fn delete_stream_hot_tier(
473473

474474
hot_tier_manager.delete_hot_tier(&stream_name).await?;
475475

476+
let mut stream_metadata: ObjectStoreFormat = serde_json::from_slice(
477+
&PARSEABLE
478+
.metastore
479+
.get_stream_json(&stream_name, false)
480+
.await?,
481+
)?;
482+
stream_metadata.hot_tier_enabled = false;
483+
484+
PARSEABLE
485+
.metastore
486+
.put_stream_json(&stream_metadata, &stream_name)
487+
.await?;
488+
476489
Ok((
477490
format!("hot tier deleted for stream {stream_name}"),
478491
StatusCode::OK,

src/storage/localfs.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -139,18 +139,29 @@ impl ObjectStorage for LocalFS {
139139
}
140140
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
141141
let time = Instant::now();
142-
let file_path = if path.to_string().contains(&self.root.to_str().unwrap()[1..]) {
143-
#[cfg(windows)]
144-
{
145-
path.to_path("")
146-
}
147-
#[cfg(not(windows))]
148-
{
142+
143+
let file_path;
144+
145+
// this is for the `get_manifest()` function because inside a snapshot, we store the absolute path (without `/`) on linux based OS
146+
// `home/user/.../manifest.json`
147+
// on windows, the path is stored with the drive letter
148+
// `D:\\parseable\\data..\\manifest.json`
149+
// thus, we need to check if the root of localfs is already present in the path
150+
#[cfg(windows)]
151+
{
152+
// in windows the absolute path (self.root) doesn't matter because we store the complete path
153+
file_path = path.to_path("");
154+
}
155+
#[cfg(not(windows))]
156+
{
157+
// absolute path (self.root) will always start with `/`
158+
let root_str = self.root.to_str().unwrap();
159+
file_path = if path.to_string().contains(&root_str[1..]) && root_str.len() > 1 {
149160
path.to_path("/")
150-
}
151-
} else {
152-
self.path_in_root(path)
153-
};
161+
} else {
162+
self.path_in_root(path)
163+
};
164+
}
154165

155166
let res: Result<Bytes, ObjectStorageError> = match fs::read(file_path).await {
156167
Ok(x) => Ok(x.into()),

src/storage/object_storage.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
280280
meta: ObjectStoreFormat,
281281
schema: Arc<Schema>,
282282
) -> Result<String, ObjectStorageError> {
283-
let s = &*schema.clone();
283+
let s: Schema = schema.as_ref().clone();
284284
PARSEABLE
285285
.metastore
286286
.put_schema(s.clone(), stream_name)
@@ -955,7 +955,7 @@ fn stream_relative_path(
955955
}
956956

957957
pub fn sync_all_streams(joinset: &mut JoinSet<Result<(), ObjectStorageError>>) {
958-
let object_store = PARSEABLE.storage.get_object_store();
958+
let object_store = PARSEABLE.storage().get_object_store();
959959
for stream_name in PARSEABLE.streams.list() {
960960
let object_store = object_store.clone();
961961
joinset.spawn(async move {
@@ -989,7 +989,7 @@ pub async fn commit_schema_to_storage(
989989
schema,
990990
serde_json::from_slice::<Schema>(&stream_schema)?,
991991
])
992-
.unwrap();
992+
.map_err(|e| ObjectStorageError::Custom(e.to_string()))?;
993993

994994
PARSEABLE
995995
.metastore

0 commit comments

Comments
 (0)