diff --git a/nats/src/nats/js/client.py b/nats/src/nats/js/client.py index dc333dea..3903efc9 100644 --- a/nats/src/nats/js/client.py +++ b/nats/src/nats/js/client.py @@ -413,8 +413,9 @@ async def cb(msg): deliver = self._nc.new_inbox() config.deliver_subject = deliver - # Auto created consumers use the filter subject. - config.filter_subject = subject + # Auto created consumers use the filter subject, unless filter_subjects is set. + if not config.filter_subjects: + config.filter_subject = subject # Heartbeats / FlowControl config.flow_control = flow_control @@ -569,9 +570,10 @@ async def main(): if config is None: config = api.ConsumerConfig() - # Auto created consumers use the filter subject. - # config.name = durable - config.filter_subject = subject + # Auto created consumers use the filter subject, unless filter_subjects is set. + if not config.filter_subjects: + config.filter_subject = subject + if durable: config.name = durable config.durable_name = durable diff --git a/nats/tests/test_js.py b/nats/tests/test_js.py index d62a1d13..33d0c79f 100644 --- a/nats/tests/test_js.py +++ b/nats/tests/test_js.py @@ -1056,6 +1056,33 @@ async def test_fetch_heartbeats(self): await nc.close() + @async_long_test + async def test_subscribe_filter_subjects(self): + nc = NATS() + await nc.connect() + + js = nc.jetstream() + + await js.add_stream(name="events", subjects=["events.>"]) + + sub = await js.pull_subscribe( + "events.>", + "filter", + config=nats.js.api.ConsumerConfig( + filter_subjects=["events.1", "events.2"], + ), + ) + for i in range(0, 15): + await js.publish("events.%d" % i, b"i:%d" % i) + msgs = await sub.fetch(20, timeout=5) + assert len(msgs) == 2 + for msg in msgs: + await msg.ack_sync() + info = await js.consumer_info("events", "filter") + assert info.num_pending == 0 + + await nc.close() + class JSMTest(SingleJetStreamServerTestCase): @async_test @@ -1849,6 +1876,37 @@ async def cb_d(msg): await js.delete_stream("pconfig") await nc.close() + @async_long_test + async def test_subscribe_filter_subjects(self): + nc = NATS() + await nc.connect() + + js = nc.jetstream() + + await js.add_stream(name="events", subjects=["events.>"]) + a = [] + + def cb(msg): + a.append(msg) + + sub = await js.subscribe( + "events.>", + "filter", + cb=cb, + config=nats.js.api.ConsumerConfig( + filter_subjects=["events.1", "events.2"], + ), + ) + for i in range(0, 15): + await js.publish("events.%d" % i, b"i:%d" % i) + await asyncio.sleep(1) + assert len(a) == 2 + + info = await sub.consumer_info() + assert info.num_pending == 0 + + await nc.close() + class AckPolicyTest(SingleJetStreamServerTestCase): @async_test