@@ -22,49 +22,57 @@ def __init__(
2222 self .__executor = executor
2323 self .__lock = threading .Lock () if not is_pyodide () else NopeLock ()
2424 self .__subscribers : Dict [
25- str , Union [Callable , Callable [..., Awaitable [Any ]]]
25+ str , set [ Union [Callable , Callable [..., Awaitable [Any ] ]]]
2626 ] = {} # key: session_id, value: handler
2727 self .__topic_subscribers : Dict [
28- str , Dict [str , Union [Callable , Callable [..., Awaitable [Any ]]]]
28+ str , Dict [str , set [ Union [Callable , Callable [..., Awaitable [Any ] ]]]]
2929 ] = {} # key: topic, value: dict[session_id, handler]
3030 self .__subscriber_topics : Dict [
31- str , Dict [str , Union [Callable , Callable [..., Awaitable [Any ]]]]
31+ str , Dict [str , set [ Union [Callable , Callable [..., Awaitable [Any ] ]]]]
3232 ] = {} # key: session_id, value: dict[topic, handler]
3333
3434 def send_all (self , message : Any ):
3535 logger .debug (f"pubsub.send_all({ message } )" )
3636 with self .__lock :
37- for handler in self .__subscribers .values ():
38- self .__send (handler , [message ])
37+ for handlers in self .__subscribers .values ():
38+ for handler in handlers :
39+ self .__send (handler , [message ])
3940
4041 def send_all_on_topic (self , topic : str , message : Any ):
4142 logger .debug (f"pubsub.send_all_on_topic({ topic } , { message } )" )
4243 with self .__lock :
4344 if topic in self .__topic_subscribers :
44- for handler in self .__topic_subscribers [topic ].values ():
45- self .__send (handler , [topic , message ])
45+ for handlers in self .__topic_subscribers [topic ].values ():
46+ for handler in handlers :
47+ self .__send (handler , [topic , message ])
4648
4749 def send_others (self , except_session_id : str , message : Any ):
4850 logger .debug (f"pubsub.send_others({ except_session_id } , { message } )" )
4951 with self .__lock :
50- for session_id , handler in self .__subscribers .items ():
52+ for session_id , handlers in self .__subscribers .items ():
5153 if except_session_id != session_id :
52- self .__send (handler , [message ])
54+ for handler in handlers :
55+ self .__send (handler , [message ])
5356
5457 def send_others_on_topic (self , except_session_id : str , topic : str , message : Any ):
5558 logger .debug (
5659 f"pubsub.send_others_on_topic({ except_session_id } , { topic } , { message } )"
5760 )
5861 with self .__lock :
5962 if topic in self .__topic_subscribers :
60- for session_id , handler in self .__topic_subscribers [topic ].items ():
63+ for session_id , handlers in self .__topic_subscribers [topic ].items ():
6164 if except_session_id != session_id :
62- self .__send (handler , [topic , message ])
65+ for handler in handlers :
66+ self .__send (handler , [topic , message ])
6367
6468 def subscribe (self , session_id : str , handler : Callable ):
6569 logger .debug (f"pubsub.subscribe({ session_id } )" )
6670 with self .__lock :
67- self .__subscribers [session_id ] = handler
71+ handlers = self .__subscribers .get (session_id )
72+ if handlers is None :
73+ handlers = set ()
74+ self .__subscribers [session_id ] = handlers
75+ handlers .add (handler )
6876
6977 def subscribe_topic (
7078 self ,
@@ -86,12 +94,20 @@ def __subscribe_topic(
8694 if topic_subscribers is None :
8795 topic_subscribers = {}
8896 self .__topic_subscribers [topic ] = topic_subscribers
89- topic_subscribers [session_id ] = handler
97+ handlers = topic_subscribers .get (session_id )
98+ if handlers is None :
99+ handlers = set ()
100+ topic_subscribers [session_id ] = handlers
101+ handlers .add (handler )
90102 subscriber_topics = self .__subscriber_topics .get (session_id )
91103 if subscriber_topics is None :
92104 subscriber_topics = {}
93105 self .__subscriber_topics [session_id ] = subscriber_topics
94- subscriber_topics [topic ] = handler
106+ handlers = subscriber_topics .get (topic )
107+ if handlers is None :
108+ handlers = set ()
109+ subscriber_topics [topic ] = handlers
110+ handlers .add (handler )
95111
96112 def unsubscribe (self , session_id : str ):
97113 logger .debug (f"pubsub.unsubscribe({ session_id } )" )
0 commit comments