Skip to content

Commit 0f24a2e

Browse files
committed
Add userchats to metastore
1 parent f9a8b11 commit 0f24a2e

File tree

4 files changed

+57
-0
lines changed

4 files changed

+57
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ futures-core = "0.3.31"
142142
tempfile = "3.20.0"
143143
lazy_static = "1.4.0"
144144
prost = "0.13.1"
145+
dashmap = "6.1.0"
145146

146147
[build-dependencies]
147148
cargo_toml = "0.21"

src/metastore/metastore_traits.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::collections::{BTreeMap, HashSet};
2121
use arrow_schema::Schema;
2222
use bytes::Bytes;
2323
use chrono::{DateTime, Utc};
24+
use dashmap::DashMap;
2425
use erased_serde::Serialize as ErasedSerialize;
2526
use tonic::async_trait;
2627

@@ -58,6 +59,11 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
5859
async fn put_dashboard(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
5960
async fn delete_dashboard(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
6061

62+
/// chats
63+
async fn get_chats(&self) -> Result<DashMap<String, Vec<Bytes>>, MetastoreError>;
64+
async fn put_chat(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
65+
async fn delete_chat(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
66+
6167
/// filters
6268
async fn get_filters(&self) -> Result<Vec<Filter>, MetastoreError>;
6369
async fn put_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;

src/metastore/metastores/object_store_metastore.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::{
2424
use arrow_schema::Schema;
2525
use bytes::Bytes;
2626
use chrono::{DateTime, Utc};
27+
use dashmap::DashMap;
2728
use http::StatusCode;
2829
use relative_path::RelativePathBuf;
2930
use tonic::async_trait;
@@ -188,6 +189,54 @@ impl Metastore for ObjectStoreMetastore {
188189
.await?)
189190
}
190191

192+
/// Fetch all chats
193+
async fn get_chats(&self) -> Result<DashMap<String, Vec<Bytes>>, MetastoreError> {
194+
let all_user_chats = DashMap::new();
195+
196+
let users_dir = RelativePathBuf::from(USERS_ROOT_DIR);
197+
for user in self.storage.list_dirs_relative(&users_dir).await? {
198+
if user.starts_with(".") {
199+
continue;
200+
}
201+
let mut chats = Vec::new();
202+
let chats_path = users_dir.join(&user).join("chats");
203+
let user_chats = self
204+
.storage
205+
.get_objects(
206+
Some(&chats_path),
207+
Box::new(|file_name| file_name.ends_with(".json")),
208+
)
209+
.await?;
210+
for chat in user_chats {
211+
chats.push(chat);
212+
}
213+
214+
all_user_chats.insert(user, chats);
215+
}
216+
217+
Ok(all_user_chats)
218+
}
219+
220+
/// Save a chat
221+
async fn put_chat(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> {
222+
// we need the path to store in obj store
223+
let path = obj.get_object_path();
224+
225+
Ok(self
226+
.storage
227+
.put_object(&RelativePathBuf::from(path), to_bytes(obj))
228+
.await?)
229+
}
230+
231+
/// Delete a chat
232+
async fn delete_chat(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> {
233+
let path = obj.get_object_path();
234+
Ok(self
235+
.storage
236+
.delete_object(&RelativePathBuf::from(path))
237+
.await?)
238+
}
239+
191240
// for get filters, take care of migration and removal of incorrect/old filters
192241
// return deserialized filter
193242
async fn get_filters(&self) -> Result<Vec<Filter>, MetastoreError> {

0 commit comments

Comments
 (0)