Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 5c03134

Browse files
authored
Convert additional database code to async/await. (#8195)
1 parent d5e73cb commit 5c03134

File tree

11 files changed

+246
-175
lines changed

11 files changed

+246
-175
lines changed

changelog.d/8195.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Convert various parts of the codebase to async/await.

synapse/appservice/__init__.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,16 @@
1414
# limitations under the License.
1515
import logging
1616
import re
17+
from typing import TYPE_CHECKING
1718

1819
from synapse.api.constants import EventTypes
20+
from synapse.appservice.api import ApplicationServiceApi
1921
from synapse.types import GroupID, get_domain_from_id
2022
from synapse.util.caches.descriptors import cached
2123

24+
if TYPE_CHECKING:
25+
from synapse.storage.databases.main import DataStore
26+
2227
logger = logging.getLogger(__name__)
2328

2429

@@ -35,30 +40,28 @@ def __init__(self, service, id, events):
3540
self.id = id
3641
self.events = events
3742

38-
def send(self, as_api):
43+
async def send(self, as_api: ApplicationServiceApi) -> bool:
3944
"""Sends this transaction using the provided AS API interface.
4045
4146
Args:
42-
as_api(ApplicationServiceApi): The API to use to send.
47+
as_api: The API to use to send.
4348
Returns:
44-
An Awaitable which resolves to True if the transaction was sent.
49+
True if the transaction was sent.
4550
"""
46-
return as_api.push_bulk(
51+
return await as_api.push_bulk(
4752
service=self.service, events=self.events, txn_id=self.id
4853
)
4954

50-
def complete(self, store):
55+
async def complete(self, store: "DataStore") -> None:
5156
"""Completes this transaction as successful.
5257
5358
Marks this transaction ID on the application service and removes the
5459
transaction contents from the database.
5560
5661
Args:
5762
store: The database store to operate on.
58-
Returns:
59-
A Deferred which resolves to True if the transaction was completed.
6063
"""
61-
return store.complete_appservice_txn(service=self.service, txn_id=self.id)
64+
await store.complete_appservice_txn(service=self.service, txn_id=self.id)
6265

6366

6467
class ApplicationService(object):

synapse/federation/persistence.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
"""
2121

2222
import logging
23+
from typing import Optional, Tuple
2324

2425
from synapse.federation.units import Transaction
2526
from synapse.logging.utils import log_function
@@ -36,25 +37,27 @@ def __init__(self, datastore):
3637
self.store = datastore
3738

3839
@log_function
39-
def have_responded(self, origin, transaction):
40-
""" Have we already responded to a transaction with the same id and
40+
async def have_responded(
41+
self, origin: str, transaction: Transaction
42+
) -> Optional[Tuple[int, JsonDict]]:
43+
"""Have we already responded to a transaction with the same id and
4144
origin?
4245
4346
Returns:
44-
Deferred: Results in `None` if we have not previously responded to
45-
this transaction or a 2-tuple of `(int, dict)` representing the
46-
response code and response body.
47+
`None` if we have not previously responded to this transaction or a
48+
2-tuple of `(int, dict)` representing the response code and response body.
4749
"""
48-
if not transaction.transaction_id:
50+
transaction_id = transaction.transaction_id # type: ignore
51+
if not transaction_id:
4952
raise RuntimeError("Cannot persist a transaction with no transaction_id")
5053

51-
return self.store.get_received_txn_response(transaction.transaction_id, origin)
54+
return await self.store.get_received_txn_response(transaction_id, origin)
5255

5356
@log_function
5457
async def set_response(
5558
self, origin: str, transaction: Transaction, code: int, response: JsonDict
5659
) -> None:
57-
""" Persist how we responded to a transaction.
60+
"""Persist how we responded to a transaction.
5861
"""
5962
transaction_id = transaction.transaction_id # type: ignore
6063
if not transaction_id:

synapse/handlers/federation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1879,8 +1879,8 @@ async def get_persisted_pdu(
18791879
else:
18801880
return None
18811881

1882-
def get_min_depth_for_context(self, context):
1883-
return self.store.get_min_depth(context)
1882+
async def get_min_depth_for_context(self, context):
1883+
return await self.store.get_min_depth(context)
18841884

18851885
async def _handle_new_event(
18861886
self, origin, event, state=None, auth_events=None, backfilled=False

synapse/storage/databases/main/appservice.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ async def set_appservice_state(self, service, state) -> None:
172172
"application_services_state", {"as_id": service.id}, {"state": state}
173173
)
174174

175-
def create_appservice_txn(self, service, events):
175+
async def create_appservice_txn(self, service, events):
176176
"""Atomically creates a new transaction for this application service
177177
with the given list of events.
178178
@@ -209,20 +209,17 @@ def _create_appservice_txn(txn):
209209
)
210210
return AppServiceTransaction(service=service, id=new_txn_id, events=events)
211211

212-
return self.db_pool.runInteraction(
212+
return await self.db_pool.runInteraction(
213213
"create_appservice_txn", _create_appservice_txn
214214
)
215215

216-
def complete_appservice_txn(self, txn_id, service):
216+
async def complete_appservice_txn(self, txn_id, service) -> None:
217217
"""Completes an application service transaction.
218218
219219
Args:
220220
txn_id(str): The transaction ID being completed.
221221
service(ApplicationService): The application service which was sent
222222
this transaction.
223-
Returns:
224-
A Deferred which resolves if this transaction was stored
225-
successfully.
226223
"""
227224
txn_id = int(txn_id)
228225

@@ -258,7 +255,7 @@ def _complete_appservice_txn(txn):
258255
{"txn_id": txn_id, "as_id": service.id},
259256
)
260257

261-
return self.db_pool.runInteraction(
258+
await self.db_pool.runInteraction(
262259
"complete_appservice_txn", _complete_appservice_txn
263260
)
264261

@@ -312,13 +309,13 @@ def _get_last_txn(self, txn, service_id):
312309
else:
313310
return int(last_txn_id[0]) # select 'last_txn' col
314311

315-
def set_appservice_last_pos(self, pos):
312+
async def set_appservice_last_pos(self, pos) -> None:
316313
def set_appservice_last_pos_txn(txn):
317314
txn.execute(
318315
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
319316
)
320317

321-
return self.db_pool.runInteraction(
318+
await self.db_pool.runInteraction(
322319
"set_appservice_last_pos", set_appservice_last_pos_txn
323320
)
324321

synapse/storage/databases/main/deviceinbox.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -190,15 +190,15 @@ def get_new_messages_for_remote_destination_txn(txn):
190190
)
191191

192192
@trace
193-
def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
193+
async def delete_device_msgs_for_remote(
194+
self, destination: str, up_to_stream_id: int
195+
) -> None:
194196
"""Used to delete messages when the remote destination acknowledges
195197
their receipt.
196198
197199
Args:
198-
destination(str): The destination server_name
199-
up_to_stream_id(int): Where to delete messages up to.
200-
Returns:
201-
A deferred that resolves when the messages have been deleted.
200+
destination: The destination server_name
201+
up_to_stream_id: Where to delete messages up to.
202202
"""
203203

204204
def delete_messages_for_remote_destination_txn(txn):
@@ -209,7 +209,7 @@ def delete_messages_for_remote_destination_txn(txn):
209209
)
210210
txn.execute(sql, (destination, up_to_stream_id))
211211

212-
return self.db_pool.runInteraction(
212+
await self.db_pool.runInteraction(
213213
"delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
214214
)
215215

synapse/storage/databases/main/e2e_room_keys.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ async def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=Non
151151

152152
return sessions
153153

154-
def get_e2e_room_keys_multi(self, user_id, version, room_keys):
154+
async def get_e2e_room_keys_multi(self, user_id, version, room_keys):
155155
"""Get multiple room keys at a time. The difference between this function and
156156
get_e2e_room_keys is that this function can be used to retrieve
157157
multiple specific keys at a time, whereas get_e2e_room_keys is used for
@@ -166,10 +166,10 @@ def get_e2e_room_keys_multi(self, user_id, version, room_keys):
166166
that we want to query
167167
168168
Returns:
169-
Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
169+
dict[str, dict[str, dict]]: a map of room IDs to session IDs to room key
170170
"""
171171

172-
return self.db_pool.runInteraction(
172+
return await self.db_pool.runInteraction(
173173
"get_e2e_room_keys_multi",
174174
self._get_e2e_room_keys_multi_txn,
175175
user_id,
@@ -283,7 +283,7 @@ def _get_current_version(txn, user_id):
283283
raise StoreError(404, "No current backup version")
284284
return row[0]
285285

286-
def get_e2e_room_keys_version_info(self, user_id, version=None):
286+
async def get_e2e_room_keys_version_info(self, user_id, version=None):
287287
"""Get info metadata about a version of our room_keys backup.
288288
289289
Args:
@@ -293,7 +293,7 @@ def get_e2e_room_keys_version_info(self, user_id, version=None):
293293
Raises:
294294
StoreError: with code 404 if there are no e2e_room_keys_versions present
295295
Returns:
296-
A deferred dict giving the info metadata for this backup version, with
296+
A dict giving the info metadata for this backup version, with
297297
fields including:
298298
version(str)
299299
algorithm(str)
@@ -324,12 +324,12 @@ def _get_e2e_room_keys_version_info_txn(txn):
324324
result["etag"] = 0
325325
return result
326326

327-
return self.db_pool.runInteraction(
327+
return await self.db_pool.runInteraction(
328328
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
329329
)
330330

331331
@trace
332-
def create_e2e_room_keys_version(self, user_id, info):
332+
async def create_e2e_room_keys_version(self, user_id: str, info: dict) -> str:
333333
"""Atomically creates a new version of this user's e2e_room_keys store
334334
with the given version info.
335335
@@ -338,7 +338,7 @@ def create_e2e_room_keys_version(self, user_id, info):
338338
info(dict): the info about the backup version to be created
339339
340340
Returns:
341-
A deferred string for the newly created version ID
341+
The newly created version ID
342342
"""
343343

344344
def _create_e2e_room_keys_version_txn(txn):
@@ -365,7 +365,7 @@ def _create_e2e_room_keys_version_txn(txn):
365365

366366
return new_version
367367

368-
return self.db_pool.runInteraction(
368+
return await self.db_pool.runInteraction(
369369
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
370370
)
371371

@@ -403,13 +403,15 @@ async def update_e2e_room_keys_version(
403403
)
404404

405405
@trace
406-
def delete_e2e_room_keys_version(self, user_id, version=None):
406+
async def delete_e2e_room_keys_version(
407+
self, user_id: str, version: Optional[str] = None
408+
) -> None:
407409
"""Delete a given backup version of the user's room keys.
408410
Doesn't delete their actual key data.
409411
410412
Args:
411-
user_id(str): the user whose backup version we're deleting
412-
version(str): Optional. the version ID of the backup version we're deleting
413+
user_id: the user whose backup version we're deleting
414+
version: Optional. the version ID of the backup version we're deleting
413415
If missing, we delete the current backup version info.
414416
Raises:
415417
StoreError: with code 404 if there are no e2e_room_keys_versions present,
@@ -430,13 +432,13 @@ def _delete_e2e_room_keys_version_txn(txn):
430432
keyvalues={"user_id": user_id, "version": this_version},
431433
)
432434

433-
return self.db_pool.simple_update_one_txn(
435+
self.db_pool.simple_update_one_txn(
434436
txn,
435437
table="e2e_room_keys_versions",
436438
keyvalues={"user_id": user_id, "version": this_version},
437439
updatevalues={"deleted": 1},
438440
)
439441

440-
return self.db_pool.runInteraction(
442+
await self.db_pool.runInteraction(
441443
"delete_e2e_room_keys_version", _delete_e2e_room_keys_version_txn
442444
)

0 commit comments

Comments
 (0)