Skip to content

Commit 1730f0f

Browse files
committed
Refactor message parsing functions and update tests
- Remove parse_msg_args and parse_hmsg_args in favor of async parse_msg and parse_hmsg - Rename and update protocol parsing functions for clarity - Update tests to use new async parsing functions
1 parent b141c5d commit 1730f0f

File tree

2 files changed

+81
-137
lines changed

2 files changed

+81
-137
lines changed

nats-client/src/nats/client/protocol/message.py

Lines changed: 19 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -147,80 +147,7 @@ def parse_control_line(line: bytes) -> tuple[str, list[str]]:
147147
raise ParseError(msg) from e
148148

149149

150-
def parse_msg_args(args: list[str]) -> tuple[str, str, str | None, int]:
151-
"""Parse MSG arguments into components.
152150

153-
Args:
154-
args: MSG command arguments
155-
156-
Returns:
157-
Tuple of (subject, sid, reply_to, payload_size)
158-
159-
Raises:
160-
ParseError: If arguments are invalid
161-
"""
162-
match len(args):
163-
case 0 | 1 | 2:
164-
msg = "Invalid MSG: not enough arguments"
165-
raise ParseError(msg)
166-
case 3:
167-
subject, sid, size_str = args
168-
try:
169-
size = int(size_str)
170-
except ValueError as e:
171-
msg = f"Invalid payload size: {size_str}"
172-
raise ParseError(msg) from e
173-
return subject, sid, None, size
174-
case 4:
175-
subject, sid, reply_to, size_str = args
176-
try:
177-
size = int(size_str)
178-
except ValueError as e:
179-
msg = f"Invalid payload size: {size_str}"
180-
raise ParseError(msg) from e
181-
return subject, sid, reply_to, size
182-
case _:
183-
msg = "Invalid MSG: too many arguments"
184-
raise ParseError(msg)
185-
186-
187-
def parse_hmsg_args(args: list[str]) -> tuple[str, str, str, int, int]:
188-
"""Parse HMSG arguments into components.
189-
190-
Args:
191-
args: HMSG command arguments
192-
193-
Returns:
194-
Tuple of (subject, sid, reply_to, header_size, total_size)
195-
196-
Raises:
197-
ParseError: If arguments are invalid
198-
"""
199-
match len(args):
200-
case 0 | 1 | 2 | 3 | 4:
201-
msg = "Invalid HMSG: not enough arguments"
202-
raise ParseError(msg)
203-
case 5:
204-
subject, sid, reply_to, header_size_str, total_size_str = args
205-
try:
206-
header_size = int(header_size_str)
207-
total_size = int(total_size_str)
208-
except ValueError as e:
209-
msg = f"Invalid size values: {header_size_str}, {total_size_str}"
210-
raise ParseError(msg) from e
211-
212-
if header_size > MAX_HEADER_SIZE:
213-
msg = f"Header too large: {header_size} > {MAX_HEADER_SIZE}"
214-
raise ParseError(msg)
215-
216-
if header_size > total_size:
217-
msg = f"Header size {header_size} larger than total size {total_size}"
218-
raise ParseError(msg)
219-
220-
return subject, sid, reply_to, header_size, total_size
221-
case _:
222-
msg = "Invalid HMSG: too many arguments"
223-
raise ParseError(msg)
224151

225152

226153
def parse_headers(
@@ -320,11 +247,11 @@ def parse_err(text: str) -> str:
320247
return text
321248

322249

323-
async def _parse_msg(reader: Reader, args: list[bytes]) -> Msg:
250+
async def parse_msg(reader: Reader, args: list[bytes]) -> Msg:
324251
"""Parse MSG message.
325252
326253
Args:
327-
reader: AsyncIO stream reader
254+
reader: Reader protocol implementation
328255
args: Message arguments
329256
330257
Returns:
@@ -367,11 +294,11 @@ async def _parse_msg(reader: Reader, args: list[bytes]) -> Msg:
367294
return Msg("MSG", subject, sid, reply_to, payload)
368295

369296

370-
async def _parse_hmsg(reader: Reader, args: list[bytes]) -> HMsg:
297+
async def parse_hmsg(reader: Reader, args: list[bytes]) -> HMsg:
371298
"""Parse HMSG message.
372299
373300
Args:
374-
reader: AsyncIO stream reader
301+
reader: Reader protocol implementation
375302
args: Message arguments
376303
377304
Returns:
@@ -432,7 +359,7 @@ async def _parse_hmsg(reader: Reader, args: list[bytes]) -> HMsg:
432359
)
433360

434361

435-
async def _parse_info(args: list[bytes]) -> Info:
362+
async def parse_info(args: list[bytes]) -> Info:
436363
"""Parse INFO message.
437364
438365
Args:
@@ -460,7 +387,7 @@ async def _parse_info(args: list[bytes]) -> Info:
460387
raise ParseError(msg) from e
461388

462389

463-
async def _parse_err(args: list[bytes]) -> Err:
390+
async def parse_err(args: list[bytes]) -> Err:
464391
"""Parse ERR message.
465392
466393
Args:
@@ -487,20 +414,20 @@ async def _parse_err(args: list[bytes]) -> Err:
487414
return Err("ERR", error_text)
488415

489416

490-
async def _parse_ping() -> Ping:
491-
"""Parse PING message.
417+
async def ping() -> Ping:
418+
"""Create PING message.
492419
493420
Returns:
494-
Parsed PING message
421+
PING message
495422
"""
496423
return Ping("PING")
497424

498425

499-
async def _parse_pong() -> Pong:
500-
"""Parse PONG message.
426+
async def pong() -> Pong:
427+
"""Create PONG message.
501428
502429
Returns:
503-
Parsed PONG message
430+
PONG message
504431
"""
505432
return Pong("PONG")
506433

@@ -509,7 +436,7 @@ async def parse(reader: Reader) -> Message | None:
509436
"""Parse a message from the protocol stream.
510437
511438
Args:
512-
reader: AsyncIO stream reader
439+
reader: Reader protocol implementation
513440
514441
Returns:
515442
Parsed protocol message or None if connection closed
@@ -545,17 +472,17 @@ async def parse(reader: Reader) -> Message | None:
545472

546473
match op:
547474
case b"MSG":
548-
return await _parse_msg(reader, args)
475+
return await parse_msg(reader, args)
549476
case b"HMSG":
550-
return await _parse_hmsg(reader, args)
477+
return await parse_hmsg(reader, args)
551478
case b"PING":
552-
return await _parse_ping()
479+
return await ping()
553480
case b"PONG":
554-
return await _parse_pong()
481+
return await pong()
555482
case b"INFO":
556-
return await _parse_info(args)
483+
return await parse_info(args)
557484
case b"ERR":
558-
return await _parse_err(args)
485+
return await parse_err(args)
559486
case _:
560487
# Use repr for better error reporting with control characters
561488
msg = f"Unknown operation: {op!r}"

nats-client/tests/test_protocol.py

Lines changed: 62 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Tests for NATS protocol message parsing and command encoding."""
22

3+
import asyncio
34
import json
45

56
import pytest
@@ -16,8 +17,8 @@
1617
ParseError,
1718
parse_control_line,
1819
parse_headers,
19-
parse_hmsg_args,
20-
parse_msg_args,
20+
parse_hmsg,
21+
parse_msg,
2122
)
2223
from nats.client.protocol.types import ConnectInfo
2324

@@ -72,69 +73,85 @@ def test_parse_control_line():
7273
parse_control_line(b"MSG " + b"x" * 4096)
7374

7475

75-
def test_parse_msg_args():
76-
"""Test parsing MSG arguments."""
76+
@pytest.mark.asyncio
77+
async def test_parse_msg():
78+
"""Test parsing MSG messages."""
7779
# Test valid MSG without reply
78-
subject, sid, reply_to, size = parse_msg_args(["foo.bar", "1", "42"])
79-
assert subject == "foo.bar"
80-
assert sid == "1"
81-
assert reply_to is None
82-
assert size == 42
80+
reader = asyncio.StreamReader()
81+
reader.feed_data(b"hello\r\n")
82+
reader.feed_eof()
83+
84+
msg = await parse_msg(reader, [b"foo.bar", b"1", b"5"])
85+
assert msg.subject == "foo.bar"
86+
assert msg.sid == "1"
87+
assert msg.reply_to is None
88+
assert msg.payload == b"hello"
8389

8490
# Test valid MSG with reply
85-
subject, sid, reply_to, size = parse_msg_args([
86-
"foo.bar", "1", "reply.to", "42"
87-
])
88-
assert subject == "foo.bar"
89-
assert sid == "1"
90-
assert reply_to == "reply.to"
91-
assert size == 42
91+
reader = asyncio.StreamReader()
92+
reader.feed_data(b"hello\r\n")
93+
reader.feed_eof()
94+
95+
msg = await parse_msg(reader, [b"foo.bar", b"1", b"reply.to", b"5"])
96+
assert msg.subject == "foo.bar"
97+
assert msg.sid == "1"
98+
assert msg.reply_to == "reply.to"
99+
assert msg.payload == b"hello"
92100

93101
# Test invalid size
94-
with pytest.raises(ParseError, match="Invalid payload size"):
95-
parse_msg_args(["foo.bar", "1", "invalid"])
102+
reader = asyncio.StreamReader()
103+
with pytest.raises(ValueError):
104+
await parse_msg(reader, [b"foo.bar", b"1", b"invalid"])
96105

97106
# Test not enough arguments
107+
reader = asyncio.StreamReader()
98108
with pytest.raises(ParseError, match="Invalid MSG: not enough arguments"):
99-
parse_msg_args(["foo.bar", "1"])
109+
await parse_msg(reader, [b"foo.bar", b"1"])
100110

101-
# Test too many arguments
102-
with pytest.raises(ParseError, match="Invalid MSG: too many arguments"):
103-
parse_msg_args(["foo.bar", "1", "reply.to", "42", "extra"])
111+
# Test payload too large
112+
reader = asyncio.StreamReader()
113+
with pytest.raises(ParseError, match="Payload too large"):
114+
await parse_msg(reader, [b"foo.bar", b"1", b"67108865"])
104115

105116

106-
def test_parse_hmsg_args():
107-
"""Test parsing HMSG arguments."""
117+
@pytest.mark.asyncio
118+
async def test_parse_hmsg():
119+
"""Test parsing HMSG messages."""
108120
# Test valid HMSG
109-
subject, sid, reply_to, header_size, total_size = parse_hmsg_args([
110-
"foo.bar", "1", "reply.to", "10", "52"
111-
])
112-
assert subject == "foo.bar"
113-
assert sid == "1"
114-
assert reply_to == "reply.to"
115-
assert header_size == 10
116-
assert total_size == 52
121+
reader = asyncio.StreamReader()
122+
header_data = b"NATS/1.0\r\n\r\n"
123+
payload = b"hello"
124+
reader.feed_data(header_data + payload + b"\r\n")
125+
reader.feed_eof()
126+
127+
header_size = len(header_data)
128+
total_size = header_size + len(payload)
129+
msg = await parse_hmsg(reader, [b"foo.bar", b"1", b"reply.to", str(header_size).encode(), str(total_size).encode()])
130+
assert msg.subject == "foo.bar"
131+
assert msg.sid == "1"
132+
assert msg.reply_to == "reply.to"
133+
assert msg.payload == b"hello"
134+
assert msg.headers == {}
117135

118136
# Test invalid sizes
119-
with pytest.raises(ParseError, match="Invalid size values"):
120-
parse_hmsg_args(["foo.bar", "1", "reply.to", "invalid", "52"])
137+
reader = asyncio.StreamReader()
138+
with pytest.raises(ValueError):
139+
await parse_hmsg(reader, [b"foo.bar", b"1", b"reply.to", b"invalid", b"52"])
121140

122141
# Test header size too large
123-
with pytest.raises(ParseError, match="Header too large"):
124-
parse_hmsg_args(["foo.bar", "1", "reply.to", "65537", "65538"])
142+
reader = asyncio.StreamReader()
143+
with pytest.raises(ParseError, match="Headers too large"):
144+
await parse_hmsg(reader, [b"foo.bar", b"1", b"reply.to", b"65537", b"65538"])
125145

126-
# Test header size larger than total
127-
with pytest.raises(ParseError,
128-
match="Header size .* larger than total size"):
129-
parse_hmsg_args(["foo.bar", "1", "reply.to", "52", "10"])
146+
# Test total size too large
147+
reader = asyncio.StreamReader()
148+
with pytest.raises(ParseError, match="Total message too large"):
149+
await parse_hmsg(reader, [b"foo.bar", b"1", b"reply.to", b"10", b"67108865"])
130150

131151
# Test not enough arguments
152+
reader = asyncio.StreamReader()
132153
with pytest.raises(ParseError, match="Invalid HMSG: not enough arguments"):
133-
parse_hmsg_args(["foo.bar", "1", "reply.to", "10"])
134-
135-
# Test too many arguments
136-
with pytest.raises(ParseError, match="Invalid HMSG: too many arguments"):
137-
parse_hmsg_args(["foo.bar", "1", "reply.to", "10", "52", "extra"])
154+
await parse_hmsg(reader, [b"foo.bar", b"1", b"10"])
138155

139156

140157
def test_parse_headers():

0 commit comments

Comments
 (0)