1
1
import asyncio
2
+ import contextlib
2
3
import copy
3
4
import enum
4
5
import errno
56
57
if HIREDIS_AVAILABLE :
57
58
import hiredis
58
59
60
+ if sys .version_info [:2 ] >= (3 , 10 ):
61
+ nullcontext = contextlib .nullcontext ()
62
+ else :
63
+
64
+ class NullContext :
65
+ async def __aenter__ (self ):
66
+ pass
67
+
68
+ async def __aexit__ (self , * args ):
69
+ pass
70
+
71
+ nullcontext = NullContext ()
72
+
59
73
NONBLOCKING_EXCEPTION_ERROR_NUMBERS = {
60
74
BlockingIOError : errno .EWOULDBLOCK ,
61
75
ssl .SSLWantReadError : 2 ,
@@ -918,55 +932,27 @@ async def can_read(self, timeout: float = 0):
918
932
f"Error while reading from { self .host } :{ self .port } : { e .args } "
919
933
)
920
934
921
- async def read_response (self , disable_decoding : bool = False ):
922
- """Read the response from a previously sent command"""
923
- try :
924
- async with self ._lock :
925
- if self .socket_timeout :
926
- async with async_timeout .timeout (self .socket_timeout ):
927
- response = await self ._parser .read_response (
928
- disable_decoding = disable_decoding
929
- )
930
- else :
931
- response = await self ._parser .read_response (
932
- disable_decoding = disable_decoding
933
- )
934
- except asyncio .TimeoutError :
935
- await self .disconnect ()
936
- raise TimeoutError (f"Timeout reading from { self .host } :{ self .port } " )
937
- except OSError as e :
938
- await self .disconnect ()
939
- raise ConnectionError (
940
- f"Error while reading from { self .host } :{ self .port } : { e .args } "
941
- )
942
- except BaseException :
943
- await self .disconnect ()
944
- raise
945
-
946
- if self .health_check_interval :
947
- if sys .version_info [0 :2 ] == (3 , 6 ):
948
- func = asyncio .get_event_loop
949
- else :
950
- func = asyncio .get_running_loop
951
- self .next_health_check = func ().time () + self .health_check_interval
952
-
953
- if isinstance (response , ResponseError ):
954
- raise response from None
955
- return response
956
-
957
- async def read_response_without_lock (self , disable_decoding : bool = False ):
935
+ async def read_response (
936
+ self ,
937
+ disable_decoding : bool = False ,
938
+ timeout : Optional [float ] = None ,
939
+ with_lock : bool = True ,
940
+ ):
958
941
"""Read the response from a previously sent command"""
942
+ read_timeout = timeout if timeout is not None else self .socket_timeout
943
+ lock_ctxt = self ._lock if with_lock else nullcontext
944
+ if not self .is_connected :
945
+ await self .connect ()
959
946
try :
960
- if self .socket_timeout :
961
- async with async_timeout .timeout (self .socket_timeout ):
962
- response = await self ._parser .read_response (
963
- disable_decoding = disable_decoding
964
- )
965
- else :
947
+ async with lock_ctxt , async_timeout .timeout (read_timeout ):
966
948
response = await self ._parser .read_response (
967
949
disable_decoding = disable_decoding
968
950
)
969
951
except asyncio .TimeoutError :
952
+ if timeout is not None :
953
+ # user requested timeout, return None
954
+ return None
955
+ # it was a self.socket_timeout error.
970
956
await self .disconnect ()
971
957
raise TimeoutError (f"Timeout reading from { self .host } :{ self .port } " )
972
958
except OSError as e :
@@ -989,6 +975,14 @@ async def read_response_without_lock(self, disable_decoding: bool = False):
989
975
raise response from None
990
976
return response
991
977
978
+ async def read_response_without_lock (
979
+ self , disable_decoding : bool = False , timeout : Optional [float ] = None
980
+ ):
981
+ """Read the response from a previously sent command"""
982
+ return await self .read_response (
983
+ disable_decoding = disable_decoding , timeout = timeout , with_lock = False
984
+ )
985
+
992
986
def pack_command (self , * args : EncodableT ) -> List [bytes ]:
993
987
"""Pack a series of arguments into the Redis protocol"""
994
988
output = []
0 commit comments