|
37 | 37 | from synapse.metrics.background_process_metrics import run_as_background_process |
38 | 38 | from synapse.types import ReadReceipt |
39 | 39 | from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter |
| 40 | +from synapse.visibility import filter_events_for_server |
40 | 41 |
|
41 | 42 | if TYPE_CHECKING: |
42 | 43 | import synapse.server |
@@ -77,6 +78,7 @@ def __init__( |
77 | 78 | ): |
78 | 79 | self._server_name = hs.hostname |
79 | 80 | self._clock = hs.get_clock() |
| 81 | + self._storage_controllers = hs.get_storage_controllers() |
80 | 82 | self._store = hs.get_datastores().main |
81 | 83 | self._transaction_manager = transaction_manager |
82 | 84 | self._instance_name = hs.get_instance_name() |
@@ -442,6 +444,12 @@ async def _catch_up_transmission_loop(self) -> None: |
442 | 444 | "This should not happen." % event_ids |
443 | 445 | ) |
444 | 446 |
|
| 447 | + logger.info( |
| 448 | + "Catching up destination %s with %d PDUs", |
| 449 | + self._destination, |
| 450 | + len(catchup_pdus), |
| 451 | + ) |
| 452 | + |
445 | 453 | # We send transactions with events from one room only, as its likely |
446 | 454 | # that the remote will have to do additional processing, which may |
447 | 455 | # take some time. It's better to give it small amounts of work |
@@ -487,19 +495,20 @@ async def _catch_up_transmission_loop(self) -> None: |
487 | 495 | ): |
488 | 496 | continue |
489 | 497 |
|
490 | | - # Filter out events where the server is not in the room, |
491 | | - # e.g. it may have left/been kicked. *Ideally* we'd pull |
492 | | - # out the kick and send that, but it's a rare edge case |
493 | | - # so we don't bother for now (the server that sent the |
494 | | - # kick should send it out if its online). |
495 | | - hosts = await self._state.get_hosts_in_room_at_events( |
496 | | - p.room_id, [p.event_id] |
497 | | - ) |
498 | | - if self._destination not in hosts: |
499 | | - continue |
500 | | - |
501 | 498 | new_pdus.append(p) |
502 | 499 |
|
| 500 | + # Filter out events where the server is not in the room, |
| 501 | + # e.g. it may have left/been kicked. *Ideally* we'd pull |
| 502 | + # out the kick and send that, but it's a rare edge case |
| 503 | + # so we don't bother for now (the server that sent the |
| 504 | + # kick should send it out if its online). |
| 505 | + new_pdus = await filter_events_for_server( |
| 506 | + self._storage_controllers, |
| 507 | + self._destination, |
| 508 | + new_pdus, |
| 509 | + redact=False, |
| 510 | + ) |
| 511 | + |
503 | 512 | # If we've filtered out all the extremities, fall back to |
504 | 513 | # sending the original event. This should ensure that the |
505 | 514 | # server gets at least some of missed events (especially if |
|
0 commit comments