Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 658b648

Browse files
committed
Process device list updates asynchronously
Carries on from #12321.
1 parent 5c9e39e commit 658b648

File tree

4 files changed

+9
-96
lines changed

4 files changed

+9
-96
lines changed

synapse/config/server.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -680,14 +680,6 @@ def read_config(self, config, **kwargs):
680680
config.get("use_account_validity_in_account_status") or False
681681
)
682682

683-
# This is a temporary option that enables fully using the new
684-
# `device_lists_changes_in_room` without the backwards compat code. This
685-
# is primarily for testing. If enabled the server should *not* be
686-
# downgraded, as it may lead to missing device list updates.
687-
self.use_new_device_lists_changes_in_room = (
688-
config.get("use_new_device_lists_changes_in_room") or False
689-
)
690-
691683
self.rooms_to_exclude_from_sync: List[str] = (
692684
config.get("exclude_rooms_from_sync") or []
693685
)

synapse/handlers/device.py

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -291,12 +291,6 @@ def __init__(self, hs: "HomeServer"):
291291
# On start up check if there are any updates pending.
292292
hs.get_reactor().callWhenRunning(self._handle_new_device_update_async)
293293

294-
# Used to decide if we calculate outbound pokes up front or not. By
295-
# default we do to allow safely downgrading Synapse.
296-
self.use_new_device_lists_changes_in_room = (
297-
hs.config.server.use_new_device_lists_changes_in_room
298-
)
299-
300294
def _check_device_name_length(self, name: Optional[str]) -> None:
301295
"""
302296
Checks whether a device name is longer than the maximum allowed length.
@@ -490,23 +484,9 @@ async def notify_device_update(
490484

491485
room_ids = await self.store.get_rooms_for_user(user_id)
492486

493-
hosts: Optional[Set[str]] = None
494-
if not self.use_new_device_lists_changes_in_room:
495-
hosts = set()
496-
497-
if self.hs.is_mine_id(user_id):
498-
for room_id in room_ids:
499-
joined_users = await self.store.get_users_in_room(room_id)
500-
hosts.update(get_domain_from_id(u) for u in joined_users)
501-
502-
set_tag("target_hosts", hosts)
503-
504-
hosts.discard(self.server_name)
505-
506487
position = await self.store.add_device_change_to_streams(
507488
user_id,
508489
device_ids,
509-
hosts=hosts,
510490
room_ids=room_ids,
511491
)
512492

@@ -528,14 +508,6 @@ async def notify_device_update(
528508
# We may need to do some processing asynchronously.
529509
self._handle_new_device_update_async()
530510

531-
if hosts:
532-
logger.info(
533-
"Sending device list update notif for %r to: %r", user_id, hosts
534-
)
535-
for host in hosts:
536-
self.federation_sender.send_device_messages(host, immediate=False)
537-
log_kv({"message": "sent device update to host", "host": host})
538-
539511
async def notify_user_signature_update(
540512
self, from_user_id: str, user_ids: List[str]
541513
) -> None:

synapse/storage/databases/main/devices.py

Lines changed: 9 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1532,7 +1532,6 @@ async def add_device_change_to_streams(
15321532
self,
15331533
user_id: str,
15341534
device_ids: Collection[str],
1535-
hosts: Optional[Collection[str]],
15361535
room_ids: Collection[str],
15371536
) -> Optional[int]:
15381537
"""Persist that a user's devices have been updated, and which hosts
@@ -1542,9 +1541,6 @@ async def add_device_change_to_streams(
15421541
user_id: The ID of the user whose device changed.
15431542
device_ids: The IDs of any changed devices. If empty, this function will
15441543
return None.
1545-
hosts: The remote destinations that should be notified of the change. If
1546-
None then the set of hosts have *not* been calculated, and will be
1547-
calculated later by a background task.
15481544
room_ids: The rooms that the user is in
15491545
15501546
Returns:
@@ -1556,58 +1552,30 @@ async def add_device_change_to_streams(
15561552

15571553
context = get_active_span_text_map()
15581554

1559-
def add_device_changes_txn(
1560-
txn, stream_ids_for_device_change, stream_ids_for_outbound_pokes
1561-
):
1555+
def add_device_changes_txn(txn, stream_ids):
15621556
self._add_device_change_to_stream_txn(
15631557
txn,
15641558
user_id,
15651559
device_ids,
1566-
stream_ids_for_device_change,
1560+
stream_ids,
15671561
)
15681562

15691563
self._add_device_outbound_room_poke_txn(
15701564
txn,
15711565
user_id,
15721566
device_ids,
15731567
room_ids,
1574-
stream_ids_for_device_change,
1575-
context,
1576-
hosts_have_been_calculated=hosts is not None,
1577-
)
1578-
1579-
# If the set of hosts to send to has not been calculated yet (and so
1580-
# `hosts` is None) or there are no `hosts` to send to, then skip
1581-
# trying to persist them to the DB.
1582-
if not hosts:
1583-
return
1584-
1585-
self._add_device_outbound_poke_to_stream_txn(
1586-
txn,
1587-
user_id,
1588-
device_ids,
1589-
hosts,
1590-
stream_ids_for_outbound_pokes,
1568+
stream_ids,
15911569
context,
15921570
)
15931571

1594-
# `device_lists_stream` wants a stream ID per device update.
1595-
num_stream_ids = len(device_ids)
1596-
1597-
if hosts:
1598-
# `device_lists_outbound_pokes` wants a different stream ID for
1599-
# each row, which is a row per host per device update.
1600-
num_stream_ids += len(hosts) * len(device_ids)
1601-
1602-
async with self._device_list_id_gen.get_next_mult(num_stream_ids) as stream_ids:
1603-
stream_ids_for_device_change = stream_ids[: len(device_ids)]
1604-
stream_ids_for_outbound_pokes = stream_ids[len(device_ids) :]
1605-
1572+
async with self._device_list_id_gen.get_next_mult(
1573+
len(device_ids)
1574+
) as stream_ids:
16061575
await self.db_pool.runInteraction(
16071576
"add_device_change_to_stream",
16081577
add_device_changes_txn,
1609-
stream_ids_for_device_change,
1610-
stream_ids_for_outbound_pokes,
1578+
stream_ids,
16111579
)
16121580

16131581
return stream_ids[-1]
@@ -1702,19 +1670,8 @@ def _add_device_outbound_room_poke_txn(
17021670
room_ids: Collection[str],
17031671
stream_ids: List[str],
17041672
context: Dict[str, str],
1705-
hosts_have_been_calculated: bool,
17061673
) -> None:
1707-
"""Record the user in the room has updated their device.
1708-
1709-
Args:
1710-
hosts_have_been_calculated: True if `device_lists_outbound_pokes`
1711-
has been updated already with the updates.
1712-
"""
1713-
1714-
# We only need to convert to outbound pokes if they are our user.
1715-
converted_to_destinations = (
1716-
hosts_have_been_calculated or not self.hs.is_mine_id(user_id)
1717-
)
1674+
"""Record the user in the room has updated their device."""
17181675

17191676
encoded_context = json_encoder.encode(context)
17201677

@@ -1739,7 +1696,7 @@ def _add_device_outbound_room_poke_txn(
17391696
device_id,
17401697
room_id,
17411698
stream_id,
1742-
converted_to_destinations,
1699+
False,
17431700
encoded_context,
17441701
)
17451702
for room_id in room_ids

tests/federation/test_federation_sender.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from typing import Optional
1515
from unittest.mock import Mock
1616

17-
from parameterized import parameterized_class
1817
from signedjson import key, sign
1918
from signedjson.types import BaseKey, SigningKey
2019

@@ -155,12 +154,6 @@ def test_send_receipts_with_backoff(self):
155154
)
156155

157156

158-
@parameterized_class(
159-
[
160-
{"enable_room_poke_code_path": False},
161-
{"enable_room_poke_code_path": True},
162-
]
163-
)
164157
class FederationSenderDevicesTestCases(HomeserverTestCase):
165158
servlets = [
166159
admin.register_servlets,
@@ -175,7 +168,6 @@ def make_homeserver(self, reactor, clock):
175168
def default_config(self):
176169
c = super().default_config()
177170
c["send_federation"] = True
178-
c["use_new_device_lists_changes_in_room"] = self.enable_room_poke_code_path
179171
return c
180172

181173
def prepare(self, reactor, clock, hs):

0 commit comments

Comments
 (0)