@@ -141,7 +141,7 @@ def decode(self, value: EncodableT, force=False) -> EncodableT:
141141class BaseParser :
142142 """Plain Python parsing class"""
143143
144- __slots__ = "_stream" , "_read_size"
144+ __slots__ = "_stream" , "_read_size" , "_connected"
145145
146146 EXCEPTION_CLASSES : ExceptionMappingT = {
147147 "ERR" : {
@@ -172,6 +172,7 @@ class BaseParser:
172172 def __init__ (self , socket_read_size : int ):
173173 self ._stream : Optional [asyncio .StreamReader ] = None
174174 self ._read_size = socket_read_size
175+ self ._connected = False
175176
176177 def __del__ (self ):
177178 try :
@@ -208,7 +209,7 @@ async def read_response(
208209class PythonParser (BaseParser ):
209210 """Plain Python parsing class"""
210211
211- __slots__ = BaseParser . __slots__ + ("encoder" , "_buffer" , "_pos" , "_chunks" )
212+ __slots__ = ("encoder" , "_buffer" , "_pos" , "_chunks" )
212213
213214 def __init__ (self , socket_read_size : int ):
214215 super ().__init__ (socket_read_size )
@@ -226,28 +227,28 @@ def on_connect(self, connection: "Connection"):
226227 self ._stream = connection ._reader
227228 if self ._stream is None :
228229 raise RedisError ("Buffer is closed." )
229-
230230 self .encoder = connection .encoder
231+ self ._clear ()
232+ self ._connected = True
231233
232234 def on_disconnect (self ):
233235 """Called when the stream disconnects"""
234- if self ._stream is not None :
235- self ._stream = None
236- self .encoder = None
237- self ._clear ()
236+ self ._connected = False
238237
239238 async def can_read_destructive (self ) -> bool :
239+ if not self ._connected :
240+ raise RedisError ("Buffer is closed." )
240241 if self ._buffer :
241242 return True
242- if self ._stream is None :
243- raise RedisError ("Buffer is closed." )
244243 try :
245244 async with async_timeout .timeout (0 ):
246245 return await self ._stream .read (1 )
247246 except asyncio .TimeoutError :
248247 return False
249248
250249 async def read_response (self , disable_decoding : bool = False ):
250+ if not self ._connected :
251+ raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
251252 if self ._chunks :
252253 # augment parsing buffer with previously read data
253254 self ._buffer += b"" .join (self ._chunks )
@@ -261,8 +262,6 @@ async def read_response(self, disable_decoding: bool = False):
261262 async def _read_response (
262263 self , disable_decoding : bool = False
263264 ) -> Union [EncodableT , ResponseError , None ]:
264- if not self ._stream or not self .encoder :
265- raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
266265 raw = await self ._readline ()
267266 response : Any
268267 byte , response = raw [:1 ], raw [1 :]
@@ -350,14 +349,13 @@ async def _readline(self) -> bytes:
350349class HiredisParser (BaseParser ):
351350 """Parser class for connections using Hiredis"""
352351
353- __slots__ = BaseParser . __slots__ + ("_reader" , "_connected" )
352+ __slots__ = ("_reader" ,)
354353
355354 def __init__ (self , socket_read_size : int ):
356355 if not HIREDIS_AVAILABLE :
357356 raise RedisError ("Hiredis is not available." )
358357 super ().__init__ (socket_read_size = socket_read_size )
359358 self ._reader : Optional [hiredis .Reader ] = None
360- self ._connected : bool = False
361359
362360 def on_connect (self , connection : "Connection" ):
363361 self ._stream = connection ._reader
0 commit comments