4848components.
4949"""
5050import logging
51- from typing import TYPE_CHECKING , Awaitable , Callable , Dict , List , Optional , Set
51+ from typing import (
52+ TYPE_CHECKING ,
53+ Awaitable ,
54+ Callable ,
55+ Collection ,
56+ Dict ,
57+ List ,
58+ Optional ,
59+ Set ,
60+ )
5261
5362from synapse .appservice import ApplicationService , ApplicationServiceState
5463from synapse .appservice .api import ApplicationServiceApi
7180# Maximum number of ephemeral events to provide in an AS transaction.
7281MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
7382
83+ # Maximum number of to-device messages to provide in an AS transaction.
84+ MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100
85+
7486
7587class ApplicationServiceScheduler :
7688 """Public facing API for this module. Does the required DI to tie the
@@ -97,15 +109,40 @@ async def start(self) -> None:
97109 for service in services :
98110 self .txn_ctrl .start_recoverer (service )
99111
100- def submit_event_for_as (
101- self , service : ApplicationService , event : EventBase
112+ def enqueue_for_appservice (
113+ self ,
114+ appservice : ApplicationService ,
115+ events : Optional [Collection [EventBase ]] = None ,
116+ ephemeral : Optional [Collection [JsonDict ]] = None ,
117+ to_device_messages : Optional [Collection [JsonDict ]] = None ,
102118 ) -> None :
103- self .queuer .enqueue_event (service , event )
119+ """
120+ Enqueue some data to be sent off to an application service.
104121
105- def submit_ephemeral_events_for_as (
106- self , service : ApplicationService , events : List [JsonDict ]
107- ) -> None :
108- self .queuer .enqueue_ephemeral (service , events )
122+ Args:
123+ appservice: The application service to create and send a transaction to.
124+ events: The persistent room events to send.
125+ ephemeral: The ephemeral events to send.
126+ to_device_messages: The to-device messages to send. These differ from normal
127+ to-device messages sent to clients, as they have 'to_device_id' and
128+ 'to_user_id' fields.
129+ """
130+ # We purposefully allow this method to run with empty events/ephemeral
131+ # collections, so that callers do not need to check iterable size themselves.
132+ if not events and not ephemeral and not to_device_messages :
133+ return
134+
135+ if events :
136+ self .queuer .queued_events .setdefault (appservice .id , []).extend (events )
137+ if ephemeral :
138+ self .queuer .queued_ephemeral .setdefault (appservice .id , []).extend (ephemeral )
139+ if to_device_messages :
140+ self .queuer .queued_to_device_messages .setdefault (appservice .id , []).extend (
141+ to_device_messages
142+ )
143+
144+ # Kick off a new application service transaction
145+ self .queuer .start_background_request (appservice )
109146
110147
111148class _ServiceQueuer :
@@ -121,13 +158,15 @@ def __init__(self, txn_ctrl: "_TransactionController", clock: Clock):
121158 self .queued_events : Dict [str , List [EventBase ]] = {}
122159 # dict of {service_id: [events]}
123160 self .queued_ephemeral : Dict [str , List [JsonDict ]] = {}
161+ # dict of {service_id: [to_device_message_json]}
162+ self .queued_to_device_messages : Dict [str , List [JsonDict ]] = {}
124163
125164 # the appservices which currently have a transaction in flight
126165 self .requests_in_flight : Set [str ] = set ()
127166 self .txn_ctrl = txn_ctrl
128167 self .clock = clock
129168
130- def _start_background_request (self , service : ApplicationService ) -> None :
169+ def start_background_request (self , service : ApplicationService ) -> None :
131170 # start a sender for this appservice if we don't already have one
132171 if service .id in self .requests_in_flight :
133172 return
@@ -136,16 +175,6 @@ def _start_background_request(self, service: ApplicationService) -> None:
136175 "as-sender-%s" % (service .id ,), self ._send_request , service
137176 )
138177
139- def enqueue_event (self , service : ApplicationService , event : EventBase ) -> None :
140- self .queued_events .setdefault (service .id , []).append (event )
141- self ._start_background_request (service )
142-
143- def enqueue_ephemeral (
144- self , service : ApplicationService , events : List [JsonDict ]
145- ) -> None :
146- self .queued_ephemeral .setdefault (service .id , []).extend (events )
147- self ._start_background_request (service )
148-
149178 async def _send_request (self , service : ApplicationService ) -> None :
150179 # sanity-check: we shouldn't get here if this service already has a sender
151180 # running.
@@ -162,11 +191,21 @@ async def _send_request(self, service: ApplicationService) -> None:
162191 ephemeral = all_events_ephemeral [:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION ]
163192 del all_events_ephemeral [:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION ]
164193
165- if not events and not ephemeral :
194+ all_to_device_messages = self .queued_to_device_messages .get (
195+ service .id , []
196+ )
197+ to_device_messages_to_send = all_to_device_messages [
198+ :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION
199+ ]
200+ del all_to_device_messages [:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION ]
201+
202+ if not events and not ephemeral and not to_device_messages_to_send :
166203 return
167204
168205 try :
169- await self .txn_ctrl .send (service , events , ephemeral )
206+ await self .txn_ctrl .send (
207+ service , events , ephemeral , to_device_messages_to_send
208+ )
170209 except Exception :
171210 logger .exception ("AS request failed" )
172211 finally :
@@ -198,10 +237,24 @@ async def send(
198237 service : ApplicationService ,
199238 events : List [EventBase ],
200239 ephemeral : Optional [List [JsonDict ]] = None ,
240+ to_device_messages : Optional [List [JsonDict ]] = None ,
201241 ) -> None :
242+ """
243+ Create a transaction with the given data and send to the provided
244+ application service.
245+
246+ Args:
247+ service: The application service to send the transaction to.
248+ events: The persistent events to include in the transaction.
249+ ephemeral: The ephemeral events to include in the transaction.
250+ to_device_messages: The to-device messages to include in the transaction.
251+ """
202252 try :
203253 txn = await self .store .create_appservice_txn (
204- service = service , events = events , ephemeral = ephemeral or []
254+ service = service ,
255+ events = events ,
256+ ephemeral = ephemeral or [],
257+ to_device_messages = to_device_messages or [],
205258 )
206259 service_is_up = await self ._is_service_up (service )
207260 if service_is_up :
0 commit comments