147147import synapse .metrics
148148from synapse .api .presence import UserPresenceState
149149from synapse .events import EventBase
150- from synapse .federation .sender .per_destination_queue import PerDestinationQueue
150+ from synapse .federation .sender .per_destination_queue import (
151+ CATCHUP_RETRY_INTERVAL ,
152+ PerDestinationQueue ,
153+ )
151154from synapse .federation .sender .transaction_manager import TransactionManager
152155from synapse .federation .units import Edu
153156from synapse .logging .context import make_deferred_yieldable , run_in_background
161164 run_as_background_process ,
162165 wrap_as_background_process ,
163166)
164- from synapse .types import JsonDict , ReadReceipt , RoomStreamToken
167+ from synapse .types import JsonDict , ReadReceipt , RoomStreamToken , StrCollection
165168from synapse .util import Clock
166169from synapse .util .metrics import Measure
170+ from synapse .util .retryutils import filter_destinations_by_retry_limiter
167171
168172if TYPE_CHECKING :
169173 from synapse .events .presence_router import PresenceRouter
@@ -213,7 +217,7 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
213217 raise NotImplementedError ()
214218
215219 @abc .abstractmethod
216- def send_presence_to_destinations (
220+ async def send_presence_to_destinations (
217221 self , states : Iterable [UserPresenceState ], destinations : Iterable [str ]
218222 ) -> None :
219223 """Send the given presence states to the given destinations.
@@ -242,9 +246,11 @@ def build_and_send_edu(
242246 raise NotImplementedError ()
243247
244248 @abc .abstractmethod
245- def send_device_messages (self , destination : str , immediate : bool = True ) -> None :
249+ async def send_device_messages (
250+ self , destinations : StrCollection , immediate : bool = True
251+ ) -> None :
246252 """Tells the sender that a new device message is ready to be sent to the
247- destination . The `immediate` flag specifies whether the messages should
253+ destinations . The `immediate` flag specifies whether the messages should
248254 be tried to be sent immediately, or whether it can be delayed for a
249255 short while (to aid performance).
250256 """
@@ -716,6 +722,13 @@ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
716722 pdu .internal_metadata .stream_ordering ,
717723 )
718724
725+ destinations = await filter_destinations_by_retry_limiter (
726+ destinations ,
727+ clock = self .clock ,
728+ store = self .store ,
729+ retry_due_within_ms = CATCHUP_RETRY_INTERVAL ,
730+ )
731+
719732 for destination in destinations :
720733 self ._get_per_destination_queue (destination ).send_pdu (pdu )
721734
@@ -763,12 +776,20 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
763776 domains_set = await self ._storage_controllers .state .get_current_hosts_in_room_or_partial_state_approximation (
764777 room_id
765778 )
766- domains = [
779+ domains : StrCollection = [
767780 d
768781 for d in domains_set
769782 if not self .is_mine_server_name (d )
770783 and self ._federation_shard_config .should_handle (self ._instance_name , d )
771784 ]
785+
786+ domains = await filter_destinations_by_retry_limiter (
787+ domains ,
788+ clock = self .clock ,
789+ store = self .store ,
790+ retry_due_within_ms = CATCHUP_RETRY_INTERVAL ,
791+ )
792+
772793 if not domains :
773794 return
774795
@@ -816,7 +837,7 @@ def _flush_rrs_for_room(self, room_id: str) -> None:
816837 for queue in queues :
817838 queue .flush_read_receipts_for_room (room_id )
818839
819- def send_presence_to_destinations (
840+ async def send_presence_to_destinations (
820841 self , states : Iterable [UserPresenceState ], destinations : Iterable [str ]
821842 ) -> None :
822843 """Send the given presence states to the given destinations.
@@ -831,13 +852,20 @@ def send_presence_to_destinations(
831852 for state in states :
832853 assert self .is_mine_id (state .user_id )
833854
855+ destinations = await filter_destinations_by_retry_limiter (
856+ [
857+ d
858+ for d in destinations
859+ if self ._federation_shard_config .should_handle (self ._instance_name , d )
860+ ],
861+ clock = self .clock ,
862+ store = self .store ,
863+ retry_due_within_ms = CATCHUP_RETRY_INTERVAL ,
864+ )
865+
834866 for destination in destinations :
835867 if self .is_mine_server_name (destination ):
836868 continue
837- if not self ._federation_shard_config .should_handle (
838- self ._instance_name , destination
839- ):
840- continue
841869
842870 self ._get_per_destination_queue (destination ).send_presence (
843871 states , start_loop = False
@@ -896,21 +924,29 @@ def send_edu(self, edu: Edu, key: Optional[Hashable]) -> None:
896924 else :
897925 queue .send_edu (edu )
898926
899- def send_device_messages (self , destination : str , immediate : bool = True ) -> None :
900- if self .is_mine_server_name (destination ):
901- logger .warning ("Not sending device update to ourselves" )
902- return
903-
904- if not self ._federation_shard_config .should_handle (
905- self ._instance_name , destination
906- ):
907- return
927+ async def send_device_messages (
928+ self , destinations : StrCollection , immediate : bool = True
929+ ) -> None :
930+ destinations = await filter_destinations_by_retry_limiter (
931+ [
932+ destination
933+ for destination in destinations
934+ if self ._federation_shard_config .should_handle (
935+ self ._instance_name , destination
936+ )
937+ and not self .is_mine_server_name (destination )
938+ ],
939+ clock = self .clock ,
940+ store = self .store ,
941+ retry_due_within_ms = CATCHUP_RETRY_INTERVAL ,
942+ )
908943
909- if immediate :
910- self ._get_per_destination_queue (destination ).attempt_new_transaction ()
911- else :
912- self ._get_per_destination_queue (destination ).mark_new_data ()
913- self ._destination_wakeup_queue .add_to_queue (destination )
944+ for destination in destinations :
945+ if immediate :
946+ self ._get_per_destination_queue (destination ).attempt_new_transaction ()
947+ else :
948+ self ._get_per_destination_queue (destination ).mark_new_data ()
949+ self ._destination_wakeup_queue .add_to_queue (destination )
914950
915951 def wake_destination (self , destination : str ) -> None :
916952 """Called when we want to retry sending transactions to a remote.
0 commit comments