Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: check
name: Check
on:
push:
branches:
Expand Down Expand Up @@ -26,8 +26,4 @@ jobs:
run: uv sync --dev

- name: Run format check
run: |
uv run yapf --diff --recursive nats/

- name: Run isort check
run: uv run isort --check-only --diff nats/src
run: uv run ruff format --check
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:

- name: Run tests
run: |
uv run flake8 --ignore="W391, W503, W504, E501" ./nats/src/nats/js/
uv run flake8 --ignore="W391, W503, W504, E501, E203" ./nats/src/nats/js/
uv run pytest -x -vv -s --continue-on-collection-errors ./nats/tests
env:
PATH: $HOME/nats-server:$PATH
Expand Down
24 changes: 8 additions & 16 deletions nats-server/src/nats/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
FATAL_PATTERN = re.compile(r"\[FTL\]\s+(.*)")
INFO_PATTERN = re.compile(r"\[INF\]\s+(.*)")
READY_PATTERN = re.compile(r"Server is ready")
LISTENING_PATTERN = re.compile(
r"Listening for client connections on (.+):(\d+)"
)
LISTENING_PATTERN = re.compile(r"Listening for client connections on (.+):(\d+)")


class ServerError(Exception):
Expand Down Expand Up @@ -279,7 +277,7 @@ async def wait_ready() -> tuple[str, int]:

if match := LISTENING_PATTERN.search(stderr_line):
host_part = match.group(1)
if host_part.startswith('[') and host_part.endswith(']'):
if host_part.startswith("[") and host_part.endswith("]"):
host = host_part[1:-1]
else:
host = host_part
Expand All @@ -298,13 +296,11 @@ async def wait_ready() -> tuple[str, int]:
if returncode != 0:
msg = f"Server exited with code {returncode}"
if error_lines:
errors = '\n'.join(error_lines)
errors = "\n".join(error_lines)
msg += f"\nErrors:\n{errors}"
raise ServerError(msg)

raise ServerError(
"Server ended without becoming ready"
) # pragma: no cover
raise ServerError("Server ended without becoming ready") # pragma: no cover

return await asyncio.wait_for(wait_ready(), timeout=timeout)

Expand Down Expand Up @@ -413,14 +409,14 @@ async def run_cluster(
node_store_dir = None
if jetstream and store_dir:
# Use as base directory and create subdirectory for each node
node_store_dir = os.path.join(store_dir, f"node{i+1}")
node_store_dir = os.path.join(store_dir, f"node{i + 1}")
os.makedirs(node_store_dir, exist_ok=True)

server = await _run_cluster_node(
config_path=config_path,
port=available_ports[i],
routes=routes,
name=f"node{i+1}",
name=f"node{i + 1}",
cluster_name="cluster",
cluster_port=cluster_ports[i],
jetstream=jetstream,
Expand Down Expand Up @@ -465,9 +461,7 @@ async def _run_cluster_node(
"""
# Build cluster URL and routes string for CLI
cluster_url = f"nats://127.0.0.1:{cluster_port}"
routes_str = ",".join(
f"nats://127.0.0.1:{r}" for r in routes
) if routes else None
routes_str = ",".join(f"nats://127.0.0.1:{r}" for r in routes) if routes else None

process = await _create_server_process(
port=port,
Expand All @@ -480,8 +474,6 @@ async def _run_cluster_node(
config_path=config_path if config_path else None,
)

assigned_host, assigned_port = await _wait_for_server_ready(
process, timeout=10.0
)
assigned_host, assigned_port = await _wait_for_server_ready(process, timeout=10.0)

return Server(process, assigned_host, assigned_port)
7 changes: 3 additions & 4 deletions nats-server/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
def get_nats_server_version():
"""Get the nats-server version or fail if not installed."""
try:
result = subprocess.run(["nats-server", "--version"],
capture_output=True,
check=True,
text=True)
result = subprocess.run(
["nats-server", "--version"], capture_output=True, check=True, text=True
)
return result.stdout.strip() or result.stderr.strip()
except (subprocess.SubprocessError, FileNotFoundError) as e:
pytest.fail(f"nats-server is not installed or not in PATH: {e}")
Expand Down
15 changes: 7 additions & 8 deletions nats-server/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ServerInfo(TypedDict):

See: https://docs.nats.io/reference/reference-protocols/nats-protocol#info
"""

# Required fields
server_id: str
server_name: str
Expand Down Expand Up @@ -264,9 +265,7 @@ async def test_run_with_store_dir_as_file(tmp_path):

# Try to start server with JetStream using a file as store_dir
with pytest.raises(ServerError) as exc_info:
await run(
port=0, jetstream=True, store_dir=str(store_file), timeout=2.0
)
await run(port=0, jetstream=True, store_dir=str(store_file), timeout=2.0)

# Verify the error message indicates the storage directory issue
error_msg = str(exc_info.value).lower()
Expand Down Expand Up @@ -522,9 +521,7 @@ async def test_cluster_with_conflicting_config(tmp_path):
"""Test run_cluster with config that includes cluster settings."""
# The function should still work, merging config with generated cluster setup
cluster = await run_cluster(
"tests/configs/jetstream.conf",
jetstream=True,
store_dir=str(tmp_path)
"tests/configs/jetstream.conf", jetstream=True, store_dir=str(tmp_path)
)

try:
Expand All @@ -549,8 +546,10 @@ async def test_run_with_invalid_host():
with pytest.raises(ServerError) as exc_info:
await run(host="999.999.999.999", port=0, timeout=2.0)

assert "exited" in str(exc_info.value
).lower() or "error" in str(exc_info.value).lower()
assert (
"exited" in str(exc_info.value).lower()
or "error" in str(exc_info.value).lower()
)


async def test_cluster_client_url():
Expand Down
10 changes: 2 additions & 8 deletions nats/benchmark/latency_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ def show_usage_and_die():

async def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"-n", "--iterations", default=DEFAULT_ITERATIONS, type=int
)
parser.add_argument("-n", "--iterations", default=DEFAULT_ITERATIONS, type=int)
parser.add_argument("-S", "--subject", default="test")
parser.add_argument("--servers", default=[], action="append")
args = parser.parse_args()
Expand All @@ -60,11 +58,7 @@ async def handler(msg):
start = time.monotonic()
to_send = args.iterations

print(
"Sending {} request/responses on [{}]".format(
args.iterations, args.subject
)
)
print("Sending {} request/responses on [{}]".format(args.iterations, args.subject))
while to_send > 0:
to_send -= 1
if to_send == 0:
Expand Down
5 changes: 1 addition & 4 deletions nats/benchmark/parser_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@


class DummyNatsClient:

def __init__(self):
self._subs = {}
self._pongs = []
Expand Down Expand Up @@ -40,9 +39,7 @@ async def _process_err(self, err=None):

def generate_msg(subject, nbytes, reply=""):
msg = []
protocol_line = "MSG {subject} 1 {reply} {nbytes}\r\n".format(
subject=subject, reply=reply, nbytes=nbytes
).encode()
protocol_line = "MSG {subject} 1 {reply} {nbytes}\r\n".format(subject=subject, reply=reply, nbytes=nbytes).encode()
msg.append(protocol_line)
msg.append(b"A" * nbytes)
msg.append(b"r\n")
Expand Down
12 changes: 2 additions & 10 deletions nats/benchmark/pub_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@ async def main():
start = time.time()
to_send = args.count

print(
"Sending {} messages of size {} bytes on [{}]".format(
args.count, args.size, args.subject
)
)
print("Sending {} messages of size {} bytes on [{}]".format(args.count, args.size, args.subject))
while to_send > 0:
for i in range(0, args.batch):
to_send -= 1
Expand All @@ -94,11 +90,7 @@ async def main():

elapsed = time.time() - start
mbytes = "%.1f" % (((args.size * args.count) / elapsed) / (1024 * 1024))
print(
"\nTest completed : {} msgs/sec ({}) MB/sec".format(
args.count / elapsed, mbytes
)
)
print("\nTest completed : {} msgs/sec ({}) MB/sec".format(args.count / elapsed, mbytes))
await nc.close()


Expand Down
20 changes: 4 additions & 16 deletions nats/benchmark/pub_sub_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ async def handler(msg):
start = time.time()
to_send = args.count

print(
"Sending {} messages of size {} bytes on [{}]".format(
args.count, args.size, args.subject
)
)
print("Sending {} messages of size {} bytes on [{}]".format(args.count, args.size, args.subject))
while to_send > 0:
for i in range(0, args.batch):
to_send -= 1
Expand All @@ -107,17 +103,9 @@ async def handler(msg):

elapsed = time.time() - start
mbytes = "%.1f" % (((args.size * args.count) / elapsed) / (1024 * 1024))
print(
"\nTest completed : {} msgs/sec sent ({}) MB/sec".format(
args.count / elapsed, mbytes
)
)

print(
"Received {} messages ({} msgs/sec)".format(
received, received / elapsed
)
)
print("\nTest completed : {} msgs/sec sent ({}) MB/sec".format(args.count / elapsed, mbytes))

print("Received {} messages ({} msgs/sec)".format(received, received / elapsed))
await nc.close()


Expand Down
6 changes: 1 addition & 5 deletions nats/benchmark/sub_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,7 @@ async def handler(msg):
elapsed = time.monotonic() - start
print("\nTest completed : {} msgs/sec sent".format(args.count / elapsed))

print(
"Received {} messages ({} msgs/sec)".format(
received, received / elapsed
)
)
print("Received {} messages ({} msgs/sec)".format(received, received / elapsed))
await nc.close()


Expand Down
7 changes: 1 addition & 6 deletions nats/examples/advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@


async def main():

async def disconnected_cb():
print("Got disconnected!")

Expand Down Expand Up @@ -41,11 +40,7 @@ async def request_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print(
"Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data
)
)
print("Received a message on '{subject} {reply}': {data}".format(subject=subject, reply=reply, data=data))

# Signal the server to stop sending messages after we got 10 already.
resp = await nc.request("help.please", b"help")
Expand Down
8 changes: 2 additions & 6 deletions nats/examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,13 @@ async def message_handler(msg):

try:
async for msg in sub.messages:
print(
f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}"
)
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
await sub.unsubscribe()
except Exception as e:
pass

async def help_request(msg):
print(
f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}"
)
print(f"Received a message on '{msg.subject} {msg.reply}': {msg.data.decode()}")
await nc.publish(msg.reply, b"I can help")

# Use queue named 'workers' for distributing requests
Expand Down
7 changes: 1 addition & 6 deletions nats/examples/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,14 @@


class Client:

def __init__(self, nc):
self.nc = nc

async def message_handler(self, msg):
print(f"[Received on '{msg.subject}']: {msg.data.decode()}")

async def request_handler(self, msg):
print(
"[Request on '{} {}']: {}".format(
msg.subject, msg.reply, msg.data.decode()
)
)
print("[Request on '{} {}']: {}".format(msg.subject, msg.reply, msg.data.decode()))
await self.nc.publish(msg.reply, b"I can help!")

async def start(self):
Expand Down
6 changes: 1 addition & 5 deletions nats/examples/clustered.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ async def subscribe_handler(msg):
print("Connection closed prematurely.")
break
except ErrTimeout as e:
print(
"Timeout occurred when publishing msg i={}: {}".format(
i, e
)
)
print("Timeout occurred when publishing msg i={}: {}".format(i, e))

end_time = datetime.now()
await nc.drain()
Expand Down
5 changes: 1 addition & 4 deletions nats/examples/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@


class Component:

def __init__(self):
self._nc = None
self._done = asyncio.Future()
Expand Down Expand Up @@ -89,9 +88,7 @@ def signal_handler():
asyncio.create_task(c.close())

for sig in ("SIGINT", "SIGTERM"):
asyncio.get_running_loop().add_signal_handler(
getattr(signal, sig), signal_handler
)
asyncio.get_running_loop().add_signal_handler(getattr(signal, sig), signal_handler)

await c.run_forever()

Expand Down
1 change: 0 additions & 1 deletion nats/examples/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@


async def main():

async def disconnected_cb():
print("Got disconnected!")

Expand Down
9 changes: 2 additions & 7 deletions nats/examples/context-manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,14 @@ async def closed_cb():
is_done.set_result(True)

arguments, _ = args.get_args("Run a context manager example.")
async with await nats.connect(arguments.servers,
closed_cb=closed_cb) as nc:
async with await nats.connect(arguments.servers, closed_cb=closed_cb) as nc:
print(f"Connected to NATS at {nc.connected_url.netloc}...")

async def subscribe_handler(msg):
subject = msg.subject
reply = msg.reply
data = msg.data.decode()
print(
"Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data
)
)
print("Received a message on '{subject} {reply}': {data}".format(subject=subject, reply=reply, data=data))

await nc.subscribe("discover", cb=subscribe_handler)
await nc.flush()
Expand Down
Loading
Loading