Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 9a6b2f2

Browse files
committed
Batch fetch references.
1 parent fa0ca0f commit 9a6b2f2

File tree

6 files changed

+108
-13
lines changed

6 files changed

+108
-13
lines changed

changelog.d/14508.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Reduce database load of [Client-Server endpoints](https://spec.matrix.org/v1.4/client-server-api/#aggregations) which return bundled aggregations.

synapse/handlers/relations.py

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,46 @@ async def get_annotations_for_events(
320320

321321
return filtered_results
322322

323+
async def get_references_for_events(
324+
self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset()
325+
) -> Dict[str, List[_RelatedEvent]]:
326+
"""Get a list of references to the given events.
327+
328+
Args:
329+
event_ids: Fetch events that relate to this event ID.
330+
ignored_users: The users ignored by the requesting user.
331+
332+
Returns:
333+
A map of event IDs to a list related events.
334+
"""
335+
336+
related_events = await self._main_store.get_references_for_events(event_ids)
337+
338+
# Avoid additional logic if there are no ignored users.
339+
if not ignored_users:
340+
return {
341+
event_id: results
342+
for event_id, results in related_events.items()
343+
if results
344+
}
345+
346+
# Filter out ignored users.
347+
results = {}
348+
for event_id, events in related_events.items():
349+
# If no references, skip.
350+
if not events:
351+
continue
352+
353+
# Filter ignored users out.
354+
events = [event for event in events if event.sender not in ignored_users]
355+
# If there are no events left, skip this event.
356+
if not events:
357+
continue
358+
359+
results[event_id] = events
360+
361+
return results
362+
323363
async def _get_threads_for_events(
324364
self,
325365
events_by_id: Dict[str, EventBase],
@@ -525,18 +565,13 @@ async def get_bundled_aggregations(
525565
"chunk": annotations
526566
}
527567

528-
# Fetch other relations per event.
529-
for event in events_by_id.values():
530-
# Fetch any references to bundle with this event.
531-
references = await self.get_relations_for_event(
532-
event.event_id,
533-
event,
534-
event.room_id,
535-
RelationTypes.REFERENCE,
536-
ignored_users=ignored_users,
537-
)
568+
# Fetch any references to bundle with this event.
569+
references_by_event_id = await self.get_references_for_events(
570+
events_by_id.keys(), ignored_users=ignored_users
571+
)
572+
for event_id, references in references_by_event_id.items():
538573
if references:
539-
results.setdefault(event.event_id, BundledAggregations()).references = {
574+
results.setdefault(event_id, BundledAggregations()).references = {
540575
"chunk": [{"event_id": ev.event_id} for ev in references]
541576
}
542577

synapse/storage/databases/main/cache.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ def _invalidate_caches_for_event(
259259

260260
if relates_to:
261261
self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
262+
self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,))
262263
self._attempt_to_invalidate_cache(
263264
"get_aggregation_groups_for_event", (relates_to,)
264265
)

synapse/storage/databases/main/events.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2049,6 +2049,10 @@ def _handle_redact_relations(
20492049
self.store._invalidate_cache_and_stream(
20502050
txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,)
20512051
)
2052+
if rel_type == RelationTypes.REFERENCE:
2053+
self.store._invalidate_cache_and_stream(
2054+
txn, self.store.get_references_for_event, (redacted_relates_to,)
2055+
)
20522056
if rel_type == RelationTypes.REPLACE:
20532057
self.store._invalidate_cache_and_stream(
20542058
txn, self.store.get_applicable_edit, (redacted_relates_to,)

synapse/storage/databases/main/relations.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -534,6 +534,60 @@ def _get_aggregation_groups_for_users_txn(
534534
"get_aggregation_groups_for_users", _get_aggregation_groups_for_users_txn
535535
)
536536

537+
@cached()
538+
async def get_references_for_event(self, event_id: str) -> List[JsonDict]:
539+
raise NotImplementedError()
540+
541+
@cachedList(cached_method_name="get_references_for_event", list_name="event_ids")
542+
async def get_references_for_events(
543+
self, event_ids: Collection[str]
544+
) -> Mapping[str, Optional[List[_RelatedEvent]]]:
545+
"""Get a list of references to the given events.
546+
547+
Args:
548+
event_ids: Fetch events that relate to these event IDs.
549+
550+
Returns:
551+
A map of event IDs to a list of related event IDs (and their senders).
552+
"""
553+
554+
clause, args = make_in_list_sql_clause(
555+
self.database_engine, "relates_to_id", event_ids
556+
)
557+
args.append(RelationTypes.REFERENCE)
558+
559+
sql = f"""
560+
SELECT relates_to_id, ref.event_id, ref.sender
561+
FROM events AS ref
562+
INNER JOIN event_relations USING (event_id)
563+
INNER JOIN events AS parent ON
564+
parent.event_id = relates_to_id
565+
AND parent.room_id = ref.room_id
566+
WHERE
567+
{clause}
568+
AND relation_type = ?
569+
ORDER BY ref.topological_ordering, ref.stream_ordering
570+
"""
571+
572+
def _get_references_for_events_txn(
573+
txn: LoggingTransaction,
574+
) -> Mapping[str, List[_RelatedEvent]]:
575+
txn.execute(sql, args)
576+
577+
result: Dict[str, List[_RelatedEvent]] = {}
578+
for relates_to_id, event_id, sender in cast(
579+
List[Tuple[str, str, str]], txn
580+
):
581+
result.setdefault(relates_to_id, []).append(
582+
_RelatedEvent(event_id, sender)
583+
)
584+
585+
return result
586+
587+
return await self.db_pool.runInteraction(
588+
"_get_references_for_events_txn", _get_references_for_events_txn
589+
)
590+
537591
@cached()
538592
def get_applicable_edit(self, event_id: str) -> Optional[EventBase]:
539593
raise NotImplementedError()

tests/rest/client/test_relations.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1108,7 +1108,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None:
11081108

11091109
# The "user" sent the root event and is making queries for the bundled
11101110
# aggregations: they have participated.
1111-
self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 8)
1111+
self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 7)
11121112
# The "user2" sent replies in the thread and is making queries for the
11131113
# bundled aggregations: they have participated.
11141114
#
@@ -1170,7 +1170,7 @@ def assert_thread(bundled_aggregations: JsonDict) -> None:
11701170
bundled_aggregations["latest_event"].get("unsigned"),
11711171
)
11721172

1173-
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 8)
1173+
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 7)
11741174

11751175
def test_nested_thread(self) -> None:
11761176
"""

0 commit comments

Comments
 (0)