Skip to content

Commit 09032db

Browse files
committed
perf(tracing): add more log for streamable http session.
1 parent f8ac00a commit 09032db

File tree

1 file changed

+12
-4
lines changed
  • crates/rmcp/src/transport/streamable_http_server

1 file changed

+12
-4
lines changed

crates/rmcp/src/transport/streamable_http_server/session.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ impl CachedTx {
120120
} else {
121121
self.cache.push_back(message.clone());
122122
}
123-
let _ = self.tx.send(message).await;
123+
let _ = self.tx.send(message).await.inspect_err(|e| {
124+
let event_id = &e.0.event_id;
125+
tracing::trace!(%event_id, "trying to send message in a closed session")
126+
});
124127
}
125128

126129
async fn sync(&mut self, index: usize) -> Result<(), SessionError> {
@@ -211,16 +214,19 @@ pub struct StreamableHttpMessageReceiver {
211214

212215
impl SessionContext {
213216
fn unregister_resource(&mut self, resource: &ResourceKey) {
214-
if let Some(http_key) = self.resource_router.remove(resource) {
215-
if let Some(channel) = self.tx_router.get_mut(&http_key) {
217+
if let Some(http_request_id) = self.resource_router.remove(resource) {
218+
tracing::trace!(?resource, http_request_id, "unregister resource");
219+
if let Some(channel) = self.tx_router.get_mut(&http_request_id) {
216220
channel.resources.remove(resource);
217221
if channel.resources.is_empty() {
218-
self.tx_router.remove(&http_key);
222+
tracing::debug!(http_request_id, "close http request wise channel");
223+
self.tx_router.remove(&http_request_id);
219224
}
220225
}
221226
}
222227
}
223228
fn register_resource(&mut self, resource: ResourceKey, http_request_id: HttpRequestId) {
229+
tracing::trace!(?resource, http_request_id, "register resource");
224230
if let Some(channel) = self.tx_router.get_mut(&http_request_id) {
225231
channel.resources.insert(resource.clone());
226232
self.resource_router.insert(resource, http_request_id);
@@ -276,6 +282,7 @@ impl SessionContext {
276282
tx: CachedTx::new(tx, Some(http_request_id)),
277283
},
278284
);
285+
tracing::debug!(http_request_id, "establish new request wise channel");
279286
Ok(StreamableHttpMessageReceiver {
280287
http_request_id: Some(http_request_id),
281288
inner: rx,
@@ -733,6 +740,7 @@ pub fn create_session(id: SessionId, config: SessionConfig) -> (Session, Session
733740
let (event_tx, event_rx) = tokio::sync::mpsc::channel(config.channel_capacity);
734741
let (common_tx, _) = tokio::sync::mpsc::channel(config.channel_capacity);
735742
let common = CachedTx::new_common(common_tx);
743+
tracing::info!(session_id = ?id, "create new session");
736744
let session_context = SessionContext {
737745
next_http_request_id: 0,
738746
id,

0 commit comments

Comments
 (0)