Skip to content

Commit ab926dd

Browse files
authored
[router][grpc] Fix streaming bugs: empty tool names, state pollution, and panics (#11373)
1 parent a4b424c commit ab926dd

33 files changed

+1107
-496
lines changed

sgl-router/benches/tool_parser_benchmark.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use criterion::{black_box, criterion_group, BenchmarkId, Criterion, Throughput};
1111
use serde_json::json;
1212
use sglang_router_rs::protocols::spec::{Function, Tool};
13-
use sglang_router_rs::tool_parser::{JsonParser, ToolParser, ToolParserFactory};
13+
use sglang_router_rs::tool_parser::{JsonParser, ParserFactory as ToolParserFactory, ToolParser};
1414
use std::collections::BTreeMap;
1515
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1616
use std::sync::{Arc, Mutex};

sgl-router/src/reasoning_parser/factory.rs

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,15 @@ impl ParserRegistry {
8282
}
8383
}
8484

85-
/// Get a parser by exact name (creates new instance, not pooled).
86-
/// Use this for compatibility or when you need a fresh instance.
87-
pub fn get_parser(&self, name: &str) -> Option<Box<dyn ReasoningParser>> {
85+
/// Check if a parser with the given name is registered.
86+
pub fn has_parser(&self, name: &str) -> bool {
87+
let creators = self.creators.read().unwrap();
88+
creators.contains_key(name)
89+
}
90+
91+
/// Create a fresh parser instance by exact name (not pooled).
92+
/// Returns a new parser instance for each call - useful for streaming where state isolation is needed.
93+
pub fn create_parser(&self, name: &str) -> Option<Box<dyn ReasoningParser>> {
8894
let creators = self.creators.read().unwrap();
8995
creators.get(name).map(|creator| creator())
9096
}
@@ -102,14 +108,30 @@ impl ParserRegistry {
102108
None
103109
}
104110

105-
/// Find a parser for a given model ID by pattern matching (creates new instance).
106-
pub fn find_parser_for_model(&self, model_id: &str) -> Option<Box<dyn ReasoningParser>> {
111+
/// Check if a parser can be created for a specific model without actually creating it.
112+
/// Returns true if a parser is available (registered) for this model.
113+
pub fn has_parser_for_model(&self, model_id: &str) -> bool {
114+
let patterns = self.patterns.read().unwrap();
115+
let model_lower = model_id.to_lowercase();
116+
117+
for (pattern, parser_name) in patterns.iter() {
118+
if model_lower.contains(&pattern.to_lowercase()) {
119+
let creators = self.creators.read().unwrap();
120+
return creators.contains_key(parser_name);
121+
}
122+
}
123+
false
124+
}
125+
126+
/// Create a fresh parser instance for a given model ID by pattern matching (not pooled).
127+
/// Returns a new parser instance for each call - useful for streaming where state isolation is needed.
128+
pub fn create_for_model(&self, model_id: &str) -> Option<Box<dyn ReasoningParser>> {
107129
let patterns = self.patterns.read().unwrap();
108130
let model_lower = model_id.to_lowercase();
109131

110132
for (pattern, parser_name) in patterns.iter() {
111133
if model_lower.contains(&pattern.to_lowercase()) {
112-
return self.get_parser(parser_name);
134+
return self.create_parser(parser_name);
113135
}
114136
}
115137
None
@@ -131,11 +153,11 @@ impl Default for ParserRegistry {
131153

132154
/// Factory for creating reasoning parsers based on model type.
133155
#[derive(Clone)]
134-
pub struct ReasoningParserFactory {
156+
pub struct ParserFactory {
135157
registry: ParserRegistry,
136158
}
137159

138-
impl ReasoningParserFactory {
160+
impl ParserFactory {
139161
/// Create a new factory with default parsers registered.
140162
pub fn new() -> Self {
141163
let registry = ParserRegistry::new();
@@ -211,7 +233,7 @@ impl ReasoningParserFactory {
211233
/// Use this when you need an isolated parser instance.
212234
pub fn create(&self, model_id: &str) -> Result<Box<dyn ReasoningParser>, ParseError> {
213235
// First try to find by pattern
214-
if let Some(parser) = self.registry.find_parser_for_model(model_id) {
236+
if let Some(parser) = self.registry.create_for_model(model_id) {
215237
return Ok(parser);
216238
}
217239

@@ -240,7 +262,7 @@ impl ReasoningParserFactory {
240262
}
241263
}
242264

243-
impl Default for ReasoningParserFactory {
265+
impl Default for ParserFactory {
244266
fn default() -> Self {
245267
Self::new()
246268
}
@@ -252,35 +274,35 @@ mod tests {
252274

253275
#[test]
254276
fn test_factory_creates_deepseek_r1() {
255-
let factory = ReasoningParserFactory::new();
277+
let factory = ParserFactory::new();
256278
let parser = factory.create("deepseek-r1-distill").unwrap();
257279
assert_eq!(parser.model_type(), "deepseek_r1");
258280
}
259281

260282
#[test]
261283
fn test_factory_creates_qwen3() {
262-
let factory = ReasoningParserFactory::new();
284+
let factory = ParserFactory::new();
263285
let parser = factory.create("qwen3-7b").unwrap();
264286
assert_eq!(parser.model_type(), "qwen3");
265287
}
266288

267289
#[test]
268290
fn test_factory_creates_kimi() {
269-
let factory = ReasoningParserFactory::new();
291+
let factory = ParserFactory::new();
270292
let parser = factory.create("kimi-chat").unwrap();
271293
assert_eq!(parser.model_type(), "kimi");
272294
}
273295

274296
#[test]
275297
fn test_factory_fallback_to_passthrough() {
276-
let factory = ReasoningParserFactory::new();
298+
let factory = ParserFactory::new();
277299
let parser = factory.create("unknown-model").unwrap();
278300
assert_eq!(parser.model_type(), "passthrough");
279301
}
280302

281303
#[test]
282304
fn test_case_insensitive_matching() {
283-
let factory = ReasoningParserFactory::new();
305+
let factory = ParserFactory::new();
284306
let parser1 = factory.create("DeepSeek-R1").unwrap();
285307
let parser2 = factory.create("QWEN3").unwrap();
286308
let parser3 = factory.create("Kimi").unwrap();
@@ -292,21 +314,21 @@ mod tests {
292314

293315
#[test]
294316
fn test_step3_model() {
295-
let factory = ReasoningParserFactory::new();
317+
let factory = ParserFactory::new();
296318
let step3 = factory.create("step3-model").unwrap();
297319
assert_eq!(step3.model_type(), "step3");
298320
}
299321

300322
#[test]
301323
fn test_glm45_model() {
302-
let factory = ReasoningParserFactory::new();
324+
let factory = ParserFactory::new();
303325
let glm45 = factory.create("glm45-v2").unwrap();
304326
assert_eq!(glm45.model_type(), "glm45");
305327
}
306328

307329
#[tokio::test]
308330
async fn test_pooled_parser_reuse() {
309-
let factory = ReasoningParserFactory::new();
331+
let factory = ParserFactory::new();
310332

311333
// Get the same parser twice - should be the same instance
312334
let parser1 = factory.get_pooled("deepseek-r1");
@@ -322,7 +344,7 @@ mod tests {
322344

323345
#[tokio::test]
324346
async fn test_pooled_parser_concurrent_access() {
325-
let factory = ReasoningParserFactory::new();
347+
let factory = ParserFactory::new();
326348
let parser = factory.get_pooled("deepseek-r1");
327349

328350
// Spawn multiple async tasks that use the same parser
@@ -348,7 +370,7 @@ mod tests {
348370

349371
#[tokio::test]
350372
async fn test_pool_clearing() {
351-
let factory = ReasoningParserFactory::new();
373+
let factory = ParserFactory::new();
352374

353375
// Get a pooled parser
354376
let parser1 = factory.get_pooled("deepseek-r1");
@@ -365,7 +387,7 @@ mod tests {
365387

366388
#[tokio::test]
367389
async fn test_passthrough_parser_pooling() {
368-
let factory = ReasoningParserFactory::new();
390+
let factory = ParserFactory::new();
369391

370392
// Unknown models should get passthrough parser
371393
let parser1 = factory.get_pooled("unknown-model-1");
@@ -383,7 +405,7 @@ mod tests {
383405
use std::sync::atomic::{AtomicUsize, Ordering};
384406
use std::time::Instant;
385407

386-
let factory = ReasoningParserFactory::new();
408+
let factory = ParserFactory::new();
387409
let num_tasks = 100;
388410
let requests_per_task = 50;
389411
let models = vec!["deepseek-r1", "qwen3", "kimi", "qwen3-thinking"];
@@ -512,7 +534,7 @@ mod tests {
512534

513535
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
514536
async fn test_concurrent_pool_modifications() {
515-
let factory = ReasoningParserFactory::new();
537+
let factory = ParserFactory::new();
516538
let mut handles = vec![];
517539

518540
// Task 1: Continuously get parsers

sgl-router/src/reasoning_parser/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ pub mod factory;
22
pub mod parsers;
33
pub mod traits;
44

5-
pub use factory::{ParserRegistry, PooledParser, ReasoningParserFactory};
5+
pub use factory::{ParserFactory, ParserRegistry, PooledParser};
66
pub use parsers::{
77
BaseReasoningParser, DeepSeekR1Parser, Glm45Parser, KimiParser, Qwen3Parser,
88
QwenThinkingParser, Step3Parser,

sgl-router/src/routers/grpc/context.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ use crate::grpc_client::{proto, SglangSchedulerClient};
1515
use crate::protocols::spec::{
1616
ChatCompletionRequest, ChatCompletionResponse, GenerateRequest, GenerateResponse,
1717
};
18-
use crate::reasoning_parser::ReasoningParserFactory;
18+
use crate::reasoning_parser::ParserFactory as ReasoningParserFactory;
1919
use crate::tokenizer::stop::StopSequenceDecoder;
2020
use crate::tokenizer::traits::Tokenizer;
21-
use crate::tool_parser::ToolParserFactory;
21+
use crate::tool_parser::ParserFactory as ToolParserFactory;
2222

2323
// ============================================================================
2424
// Core Context Types

sgl-router/src/routers/grpc/pd_router.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use crate::protocols::spec::{
77
ChatCompletionRequest, CompletionRequest, EmbeddingRequest, GenerateRequest, RerankRequest,
88
ResponsesGetParams, ResponsesRequest,
99
};
10-
use crate::reasoning_parser::ReasoningParserFactory;
10+
use crate::reasoning_parser::ParserFactory as ReasoningParserFactory;
1111
use crate::routers::RouterTrait;
1212
use crate::server::AppContext;
1313
use crate::tokenizer::traits::Tokenizer;
14-
use crate::tool_parser::ToolParserFactory;
14+
use crate::tool_parser::ParserFactory as ToolParserFactory;
1515
use async_trait::async_trait;
1616
use axum::{
1717
body::Body,

sgl-router/src/routers/grpc/processing.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@ use crate::protocols::spec::{
1313
ChatChoice, ChatCompletionMessage, ChatCompletionRequest, FunctionCallResponse, ToolCall,
1414
ToolChoice, ToolChoiceValue,
1515
};
16-
use crate::reasoning_parser::ReasoningParserFactory;
16+
use crate::reasoning_parser::ParserFactory as ReasoningParserFactory;
1717
use crate::tokenizer::stop::{SequenceDecoderOutput, StopSequenceDecoder};
1818
use crate::tokenizer::traits::Tokenizer;
19-
use crate::tool_parser::ToolParserFactory;
19+
use crate::tool_parser::ParserFactory as ToolParserFactory;
2020

2121
use super::utils;
2222

sgl-router/src/routers/grpc/router.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ use crate::protocols::spec::{
1818
ChatCompletionRequest, CompletionRequest, EmbeddingRequest, GenerateRequest, RerankRequest,
1919
ResponsesGetParams, ResponsesRequest,
2020
};
21-
use crate::reasoning_parser::ReasoningParserFactory;
21+
use crate::reasoning_parser::ParserFactory as ReasoningParserFactory;
2222
use crate::routers::RouterTrait;
2323
use crate::server::AppContext;
2424
use crate::tokenizer::traits::Tokenizer;
25-
use crate::tool_parser::ToolParserFactory;
25+
use crate::tool_parser::ParserFactory as ToolParserFactory;
2626

2727
/// gRPC router implementation for SGLang
2828
#[derive(Clone)]

0 commit comments

Comments
 (0)