Skip to content

Commit 6187313

Browse files
committed
Add userchats to metastore
1 parent 4a2231d commit 6187313

File tree

8 files changed

+63
-8
lines changed

8 files changed

+63
-8
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ futures-core = "0.3.31"
142142
tempfile = "3.20.0"
143143
lazy_static = "1.4.0"
144144
prost = "0.13.1"
145+
dashmap = "6.1.0"
145146

146147
[build-dependencies]
147148
cargo_toml = "0.21"

src/alerts/alert_types.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,14 +182,14 @@ impl AlertTrait for ThresholdAlert {
182182
&mut self,
183183
new_notification_state: NotificationState,
184184
) -> Result<(), AlertError> {
185+
// update state in memory
186+
self.notification_state = new_notification_state;
187+
185188
// update on disk
186189
PARSEABLE
187190
.metastore
188191
.put_alert(&self.to_alert_config())
189192
.await?;
190-
// update state in memory
191-
self.notification_state = new_notification_state;
192-
193193
Ok(())
194194
}
195195

src/alerts/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -992,11 +992,11 @@ impl actix_web::ResponseError for AlertError {
992992
impl AlertManagerTrait for Alerts {
993993
/// Loads alerts from disk, blocks
994994
async fn load(&self) -> anyhow::Result<()> {
995-
let mut map = self.alerts.write().await;
996-
997995
// Get alerts path and read raw bytes for migration handling
998996
let raw_objects = PARSEABLE.metastore.get_alerts().await?;
999997

998+
let mut map = self.alerts.write().await;
999+
10001000
for raw_bytes in raw_objects {
10011001
// First, try to parse as JSON Value to check version
10021002
let json_value: JsonValue = match serde_json::from_slice(&raw_bytes) {

src/handlers/http/logstream.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,7 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
9090
let res = PARSEABLE
9191
.metastore
9292
.list_streams()
93-
.await
94-
.unwrap()
93+
.await?
9594
.into_iter()
9695
.filter(|logstream| {
9796
Users.authorize(key.clone(), Action::ListStream, Some(logstream), None)

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,6 @@ pub async fn get_stats(
163163
})?;
164164

165165
if !date_value.is_empty() {
166-
// this function requires all the ingestor stream jsons
167166
let obs = PARSEABLE
168167
.metastore
169168
.get_all_stream_jsons(&stream_name, None)

src/metastore/metastore_traits.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::collections::{BTreeMap, HashSet};
2121
use arrow_schema::Schema;
2222
use bytes::Bytes;
2323
use chrono::{DateTime, Utc};
24+
use dashmap::DashMap;
2425
use erased_serde::Serialize as ErasedSerialize;
2526
use tonic::async_trait;
2627

@@ -58,6 +59,11 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
5859
async fn put_dashboard(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
5960
async fn delete_dashboard(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
6061

62+
/// chats
63+
async fn get_chats(&self) -> Result<DashMap<String, Vec<Bytes>>, MetastoreError>;
64+
async fn put_chat(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
65+
async fn delete_chat(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
66+
6167
/// filters
6268
async fn get_filters(&self) -> Result<Vec<Filter>, MetastoreError>;
6369
async fn put_filter(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;

src/metastore/metastores/object_store_metastore.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::{
2424
use arrow_schema::Schema;
2525
use bytes::Bytes;
2626
use chrono::{DateTime, Utc};
27+
use dashmap::DashMap;
2728
use http::StatusCode;
2829
use relative_path::RelativePathBuf;
2930
use tonic::async_trait;
@@ -188,6 +189,54 @@ impl Metastore for ObjectStoreMetastore {
188189
.await?)
189190
}
190191

192+
/// Fetch all chats
193+
async fn get_chats(&self) -> Result<DashMap<String, Vec<Bytes>>, MetastoreError> {
194+
let all_user_chats = DashMap::new();
195+
196+
let users_dir = RelativePathBuf::from(USERS_ROOT_DIR);
197+
for user in self.storage.list_dirs_relative(&users_dir).await? {
198+
if user.starts_with(".") {
199+
continue;
200+
}
201+
let mut chats = Vec::new();
202+
let chats_path = users_dir.join(&user).join("chats");
203+
let user_chats = self
204+
.storage
205+
.get_objects(
206+
Some(&chats_path),
207+
Box::new(|file_name| file_name.ends_with(".json")),
208+
)
209+
.await?;
210+
for chat in user_chats {
211+
chats.push(chat);
212+
}
213+
214+
all_user_chats.insert(user, chats);
215+
}
216+
217+
Ok(all_user_chats)
218+
}
219+
220+
/// Save a chat
221+
async fn put_chat(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> {
222+
// we need the path to store in obj store
223+
let path = obj.get_object_path();
224+
225+
Ok(self
226+
.storage
227+
.put_object(&RelativePathBuf::from(path), to_bytes(obj))
228+
.await?)
229+
}
230+
231+
/// Delete a chat
232+
async fn delete_chat(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> {
233+
let path = obj.get_object_path();
234+
Ok(self
235+
.storage
236+
.delete_object(&RelativePathBuf::from(path))
237+
.await?)
238+
}
239+
191240
// for get filters, take care of migration and removal of incorrect/old filters
192241
// return deserialized filter
193242
async fn get_filters(&self) -> Result<Vec<Filter>, MetastoreError> {

0 commit comments

Comments
 (0)