1919
2020from synapse .api .errors import SynapseError
2121from synapse .http .server import HttpServer
22+ from synapse .http .servlet import parse_json_object_from_request
2223from synapse .replication .http ._base import ReplicationEndpoint
2324from synapse .types import JsonDict
2425
@@ -51,10 +52,15 @@ def __init__(self, hs: "HomeServer"):
5152 self ._state_handler = hs .get_state_handler ()
5253 self ._events_shard_config = hs .config .worker .events_shard_config
5354 self ._instance_name = hs .get_instance_name ()
55+ self ._main_store = hs .get_datastores ().main
56+ self ._replication = hs .get_replication_data_handler ()
5457
5558 @staticmethod
56- async def _serialize_payload (room_id : str ) -> JsonDict : # type: ignore[override]
57- return {}
59+ async def _serialize_payload (room_id : str , local_instance_name : str , unpartial_state_events_position : int ) -> JsonDict : # type: ignore[override]
60+ return {
61+ "instance_name" : local_instance_name ,
62+ "unpartial_state_events_position" : unpartial_state_events_position ,
63+ }
5864
5965 async def _handle_request ( # type: ignore[override]
6066 self , request : Request , room_id : str
@@ -65,9 +71,22 @@ async def _handle_request( # type: ignore[override]
6571 400 , "/update_current_state request was routed to the wrong worker"
6672 )
6773
74+ payload = parse_json_object_from_request (request )
75+ await self ._replication .wait_for_stream_position (
76+ payload ["instance_name" ],
77+ "un_partial_stated_event" ,
78+ payload ["unpartial_state_events_position" ],
79+ )
80+
6881 await self ._state_handler .update_current_state (room_id )
6982
70- return 200 , {}
83+ assert self ._main_store ._cache_id_gen is not None
84+
85+ return 200 , {
86+ "caches_position" : self ._main_store ._cache_id_gen .get_current_token_for_writer (
87+ self ._instance_name
88+ )
89+ }
7190
7291
7392def register_servlets (hs : "HomeServer" , http_server : HttpServer ) -> None :
0 commit comments